Dripline-Cpp  v2.10.11
Dripline Implementation in C++
agent.cc
Go to the documentation of this file.
1 /*
2  * agent.cc
3  *
4  * Created on: Jun 2, 2016
5  * Author: nsoblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "agent.hh"
11 
12 #include "agent_config.hh"
13 #include "core.hh"
14 #include "dripline_constants.hh"
15 #include "dripline_exceptions.hh"
16 #include "dripline_version.hh"
17 #include "receiver.hh"
18 #include "uuid.hh"
19 
20 #include "logger.hh"
21 #include "param_codec.hh"
22 #include "path.hh"
23 #include "signal_handler.hh"
24 
25 #include <algorithm> // for min
26 #include <string>
27 
28 // In Windows there's a preprocessor macro called uuid_t that conflicts with this typdef
29 #ifdef uuid_t
30 #undef uuid_t
31 #endif
32 
33 using scarab::param;
34 using scarab::param_array;
35 using scarab::param_node;
36 using scarab::param_ptr_t;
37 using scarab::param_value;
38 
39 namespace dripline
40 {
41  LOGGER( dlog, "agent" );
42 
44  f_is_dry_run( false ),
45  f_routing_key(),
46  f_specifier(),
47  f_lockout_key( generate_nil_uuid() ),
48  f_return_code( dl_success().rc_value() ),
49  f_return_message(),
50  f_timeout( 0 ),
51  f_suppress_output( false ),
52  f_json_print( false ),
53  f_pretty_print( false ),
54  f_save_filename(),
55  f_reply(),
56  f_return( dl_client_error().rc_value() )
57  {
58  }
59 
60  void agent::sub_agent::execute( const scarab::param_node& a_config, const scarab::authentication& a_auth )
61  {
62  const scarab::param_array a_ord_args;
63  execute( a_config, a_ord_args, a_auth );
64  }
65 
66  void agent::sub_agent::execute( const scarab::param_node& a_config, const scarab::param_array& a_ord_args, const scarab::authentication& a_auth )
67  {
68  LINFO( dlog, "Creating message" );
69 
70  // create a copy of the config that will be pared down by removing expected elements
71  param_node t_config( a_config );
72 
73  param_node t_dripline_node;
74  if( t_config.has( "dripline_mesh" ) )
75  {
76  t_dripline_node = std::move(t_config.remove( "dripline_mesh" )->as_node());
77  }
78 
79  core t_core( t_dripline_node, a_auth );
80 
81  t_config.remove( "auth_file" );
82  t_config.remove( "auth_groups" );
83 
84  f_agent->set_timeout( t_config.get_value( "timeout", 10U ) * 1000 ); // convert seconds (dripline agent user interface) to milliseconds (expected by SimpleAmqpClient)
85  t_config.erase( "timeout" );
86  f_agent->set_json_print( t_config.get_value( "json_print", f_agent->get_json_print() ) );
87  t_config.erase( "json_print" );
88  f_agent->set_pretty_print( t_config.get_value( "pretty_print", f_agent->get_pretty_print() ) );
89  t_config.erase( "pretty_print" );
90  f_agent->set_suppress_output( t_config.get_value( "suppress_output", f_agent->get_suppress_output() ) );
91  t_config.erase( "suppress_output" );
92 
93  f_agent->routing_key() = t_config.get_value( "rk", f_agent->routing_key() );
94  t_config.erase( "rk" );
95 
96  f_agent->specifier() = t_config.get_value( "specifier", f_agent->specifier() );
97  t_config.erase( "specifier" );
98 
99  if( t_config.has( "lockout_key" ) )
100  {
101  bool t_lk_valid = true;
102  f_agent->lockout_key() = dripline::uuid_from_string( t_config["lockout_key"]().as_string(), t_lk_valid );
103  t_config.erase( "lockout_key" );
104  if( ! t_lk_valid )
105  {
106  LERROR( dlog, "Invalid lockout key provided: <" << t_config.get_value( "lockout_key", "" ) << ">" );
107  f_agent->set_return( dl_client_error().rc_value() );
108  return;
109  }
110  }
111 
112  if( t_config.has( "return" ) )
113  {
114  f_agent->set_return_code( t_config["return"].as_node().get_value( "code", dl_success().rc_value() ) );
115  f_agent->return_message() = t_config["return"].as_node().get_value( "message", "" );
116  t_config.erase( "return" );
117  }
118 
119  f_agent->save_filename() = t_config.get_value( "save", "" );
120  t_config.erase( "save" );
121 
122  // load the values array, merged in the proper order
123  scarab::param_array t_values;
124  if( t_config.has( "values" ) )
125  {
126  t_values.merge( t_config["values"].as_array() );
127  t_config.erase( "values" );
128  }
129  t_values.merge( a_ord_args );
130  if( t_config.has( "option_values" ) )
131  {
132  t_values.merge( t_config["option_values"].as_array() );
133  t_config.erase( "option_values" );
134  }
135  if( ! t_values.empty() )
136  {
137  t_config.add( "values", t_values );
138  }
139 
140  // check if this is meant to be a dry run message
141  if( t_config.has( "dry_run_msg" ) )
142  {
143  f_agent->set_is_dry_run( t_config["dry_run_msg"]().as_bool() );
144  t_config.erase( "dry_run_msg" );
145  }
146 
147  this->create_and_send_message( t_config, t_core );
148 
149  return;
150  }
151 
152  void agent::sub_agent_request::create_and_send_message( scarab::param_node& a_config, const core& a_core )
153  {
154  // create the request
155  request_ptr_t t_request = this->create_request( a_config );
156  LDEBUG( dlog, "message payload to send is: " << t_request->payload() );
157 
158  if( ! t_request )
159  {
160  LERROR( dlog, "Unable to create request" );
161  f_agent->set_return( dl_client_error_invalid_request().rc_value() );
162  return;
163  }
164 
165  // if this is a dry run, we print the message and stop here
166  if( f_agent->get_is_dry_run() )
167  {
168  LPROG( dlog, "Request (routing key = " << f_agent->routing_key() << "; specifier = " << f_agent->specifier() << "):\n" << *t_request );
169  f_agent->set_return( dl_warning_dry_run().rc_value() );
170  return;
171  }
172 
173  // now all that remains in f_config should be values to pass to the server as arguments to the request
174 
175  t_request->lockout_key() = f_agent->lockout_key();
176 
177  LINFO( dlog, "Sending message w/ message_operation = " << t_request->get_message_operation() << " to " << t_request->routing_key() );
178  LDEBUG( dlog, "Message headers:\n" << t_request->get_message_param( false ) );
179 
180  sent_msg_pkg_ptr t_receive_reply;
181  try
182  {
183  t_receive_reply = a_core.send( t_request );
184  }
185  catch( message_ptr_t )
186  {
187  LWARN( dlog, "Operating in offline mode; message not sent" );
188  f_agent->set_return( dl_warning_offline().rc_value() );
189  return;
190  }
191  catch( connection_error& e )
192  {
193  LERROR( dlog, "Unable to connect to the broker:\n" << e.what() );
194  f_agent->set_return( dl_amqp_error_broker_connection().rc_value() );
195  return;
196  }
197  catch( dripline_error& e )
198  {
199  LERROR( dlog, "Unable to send request:\n" << e.what() );
200  f_agent->set_return( dl_client_error_unable_to_send().rc_value() );
201  return;
202  }
203 
204  if( ! t_receive_reply->f_successful_send )
205  {
206  LERROR( dlog, "Unable to send request:\n" + t_receive_reply->f_send_error_message );
207  f_agent->set_return( dl_client_error_unable_to_send().rc_value() );
208  return;
209  }
210 
211  if( ! t_receive_reply->f_consumer_tag.empty() ) // this indicates that the reply queue was created, and we've started consuming on it; we should wait for a reply
212  {
213  LINFO( dlog, "Waiting for a reply from the server; use ctrl-c to cancel" );
214 
215  // timed blocking call to wait for incoming message
216  receiver t_msg_receiver;
218  auto t_rec_cancel_wrap = wrap_cancelable( t_msg_receiver );
219  scarab::signal_handler::add_cancelable( t_rec_cancel_wrap );
220  dripline::reply_ptr_t t_reply = t_msg_receiver.wait_for_reply( t_receive_reply, t_post_listen_status, f_agent->get_timeout() );
221 
222  if( t_msg_receiver.is_canceled() )
223  {
224  LDEBUG( dlog, "Agent canceled while waiting for reply" );
225  f_agent->set_return( dl_success().rc_value() );
226  }
227  else if( t_reply )
228  {
229  LINFO( dlog, "Response received" );
230  f_agent->set_return( t_reply->get_return_code() );
231 
232  const param& t_payload = t_reply->payload();
233 
234  LPROG( dlog, "Response:\n" <<
235  "Return code: " << t_reply->get_return_code() << '\n' <<
236  "Return message: " << t_reply->return_message() << '\n' <<
237  t_payload );
238 
239  if( ! f_agent->get_suppress_output() )
240  {
241  if( ! f_agent->get_json_print() && ! f_agent->get_pretty_print() )
242  {
243  std::cout << *t_reply << std::endl;
244  }
245  else
246  {
247  param_node t_encoding_options;
248  if( f_agent->get_pretty_print() )
249  {
250  t_encoding_options.add( "style", "pretty" );
251  }
252  std::string t_encoded_message = t_reply->encode_full_message( 5000, t_encoding_options );
253  std::cout << t_encoded_message << std::endl;
254  }
255  }
256 
257  if( ! f_agent->save_filename().empty() && ! t_payload.is_null() )
258  {
259  scarab::param_translator t_translator;
260  if( ! t_translator.write_file( t_payload, f_agent->save_filename() ) )
261  {
262  LERROR( dlog, "Unable to write out payload" );
263  f_agent->set_return( dl_client_error_handling_reply().rc_value() );
264  }
265  }
266  }
267  else
268  {
269  if( t_post_listen_status == core::post_listen_status::timeout )
270  {
271  LWARN( dlog, "Timed out or while waiting for reply" );
272  f_agent->set_return( dl_client_error_timeout().rc_value() );
273  }
274  else
275  {
276  if( t_post_listen_status == core::post_listen_status::hard_error )
277  {
278  LERROR( dlog, "Error while waiting for reply" );
279  }
280  else
281  {
282  LERROR( dlog, "Unknown state while waiting for reply: " << (int)t_post_listen_status );
283  }
284  f_agent->set_return( dl_client_error().rc_value() );
285  }
286  }
287  f_agent->set_reply( t_reply );
288  }
289  else
290  {
291  f_agent->set_return( dl_success().rc_value() );
292  }
293 
294  return;
295  }
296 
297  void agent::sub_agent_reply::create_and_send_message( scarab::param_node& a_config, const core& a_core )
298  {
299  // create the alert
300  param_ptr_t t_payload_ptr( new param_node( a_config ) );
301 
302  reply_ptr_t t_reply = msg_reply::create( f_agent->get_return_code(),
303  f_agent->return_message(),
304  std::move(t_payload_ptr),
305  f_agent->routing_key(),
306  f_agent->specifier() );
307  LDEBUG( dlog, "reply payload to send is: " << t_reply->payload() );
308 
309  if( ! t_reply )
310  {
311  LERROR( dlog, "Unable to create reply" );
312  f_agent->set_return( dl_client_error_invalid_request().rc_value() );
313  return;
314  }
315 
316  // if this is a dry run, we print the message and stop here
317  if( f_agent->get_is_dry_run() )
318  {
319  LPROG( dlog, "Reply (routing key = " << f_agent->routing_key() << "; specifier = " << f_agent->specifier() << "):\n" << *t_reply );
320  f_agent->set_return( dl_warning_dry_run().rc_value() );
321  return;
322  }
323 
324  LINFO( dlog, "Sending reply with return code <" << t_reply->get_return_code() << "> and message <" << t_reply->return_message() << "> to key " << t_reply->routing_key() );
325  LDEBUG( dlog, "Message headers:\n" << t_reply->get_message_param( false ) );
326 
327  sent_msg_pkg_ptr t_msg_sent;
328  try
329  {
330  t_msg_sent = a_core.send( t_reply );
331  }
332  catch( message_ptr_t )
333  {
334  LWARN( dlog, "Operating in offline mode; message not sent" );
335  f_agent->set_return( dl_warning_offline().rc_value() );
336  return;
337  }
338  catch( connection_error& e )
339  {
340  LERROR( dlog, "Unable to connect to the broker:\n" << e.what() );
341  f_agent->set_return( dl_amqp_error_broker_connection().rc_value() );
342  return;
343  }
344  catch( dripline_error& e )
345  {
346  LERROR( dlog, "Unable to send reply:\n" << e.what() );
347  f_agent->set_return( dl_client_error_unable_to_send().rc_value() );
348  return;
349  }
350 
351  if( ! t_msg_sent->f_successful_send )
352  {
353  LERROR( dlog, "Unable to send reply:\n" + t_msg_sent->f_send_error_message );
354  f_agent->set_return( dl_client_error_unable_to_send().rc_value() );
355  }
356  else
357  {
358  f_agent->set_return( dl_success().rc_value() );
359  }
360 
361  return;
362  }
363 
364  void agent::sub_agent_alert::create_and_send_message( scarab::param_node& a_config, const core& a_core )
365  {
366  // create the alert
367  param_ptr_t t_payload_ptr( new param_node( a_config ) );
368 
369  alert_ptr_t t_alert = msg_alert::create( std::move(t_payload_ptr),
370  f_agent->routing_key(),
371  f_agent->specifier() );
372  LDEBUG( dlog, "alert payload to send is: " << t_alert->payload() );
373 
374  if( ! t_alert )
375  {
376  LERROR( dlog, "Unable to create alert" );
377  f_agent->set_return( dl_client_error_invalid_request().rc_value() );
378  return;
379  }
380 
381  // if this is a dry run, we print the message and stop here
382  if( f_agent->get_is_dry_run() )
383  {
384  LPROG( dlog, "Alert (routing key = " << f_agent->routing_key() << "; specifier = " << f_agent->specifier() << "):\n" << *t_alert );
385  f_agent->set_return( dl_warning_dry_run().rc_value() );
386  return;
387  }
388 
389  LINFO( dlog, "Sending alert with key " << t_alert->routing_key() );
390  LDEBUG( dlog, "Message headers:\n" << t_alert->get_message_param( false ) );
391 
392  sent_msg_pkg_ptr t_msg_sent;
393  try
394  {
395  t_msg_sent = a_core.send( t_alert );
396  }
397  catch( message_ptr_t )
398  {
399  LWARN( dlog, "Operating in offline mode; message not sent" );
400  f_agent->set_return( dl_warning_offline().rc_value() );
401  return;
402  }
403  catch( connection_error& e )
404  {
405  LERROR( dlog, "Unable to connect to the broker:\n" << e.what() );
406  f_agent->set_return( dl_amqp_error_broker_connection().rc_value() );
407  return;
408  }
409  catch( dripline_error& e )
410  {
411  LERROR( dlog, "Unable to send alert:\n" << e.what() );
412  f_agent->set_return( dl_client_error_unable_to_send().rc_value() );
413  return;
414  }
415 
416  if( ! t_msg_sent->f_successful_send )
417  {
418  LERROR( dlog, "Unable to send alert:\n" + t_msg_sent->f_send_error_message );
419  f_agent->set_return( dl_client_error_unable_to_send().rc_value() );
420  }
421  else
422  {
423  f_agent->set_return( dl_success().rc_value() );
424  }
425 
426  return;
427  }
428 
429  request_ptr_t agent::sub_agent_get::create_request( scarab::param_node& a_config )
430  {
431  param_ptr_t t_payload_ptr( new param_node( a_config ) );
432 
433  return msg_request::create( std::move(t_payload_ptr),
434  op_t::get,
435  f_agent->routing_key(),
436  f_agent->specifier() );
437  }
438 
439  request_ptr_t agent::sub_agent_set::create_request( scarab::param_node& a_config )
440  {
441  // require the values array
442  if( ! a_config.has( "values" ) )
443  {
444  LERROR( dlog, "No \"values\" option given" );
445  return nullptr;
446  }
447 
448  param_ptr_t t_payload_ptr( new param_node( a_config ) );
449 
450  return msg_request::create( std::move(t_payload_ptr),
451  op_t::set,
452  f_agent->routing_key(),
453  f_agent->specifier() );
454  }
455 
457  {
458  param_ptr_t t_payload_ptr( new param_node() );
459  param_node& t_payload_node = t_payload_ptr->as_node();
460 
461  // for the load instruction, the instruction node should be replaced by the contents of the file specified
462  if( a_config.has( "load" ) )
463  {
464  if( ! a_config["load"].as_node().has( "json" ) )
465  {
466  LERROR( dlog, "Load instruction did not contain a valid file type");
467  return nullptr;
468  }
469 
470  std::string t_load_filename( a_config["load"]().as_string() );
471  scarab::param_translator t_translator;
472  scarab::param_ptr_t t_node_from_file = t_translator.read_file( t_load_filename );
473  if( t_node_from_file == nullptr || ! t_node_from_file->is_node() )
474  {
475  LERROR( dlog, "Unable to read JSON file <" << t_load_filename << ">" );
476  return nullptr;
477  }
478 
479  t_payload_node.merge( t_node_from_file->as_node() );
480  a_config.erase( "load" );
481  }
482 
483  // at this point, all that remains in a_config should be other options that we want to add to the payload node
484  t_payload_node.merge( a_config );
485 
486  return msg_request::create( std::move(t_payload_ptr),
487  op_t::cmd,
488  f_agent->routing_key(),
489  f_agent->specifier() );
490  }
491 
492 } /* namespace dripline */
virtual void create_and_send_message(scarab::param_node &a_config, const core &a_core)
Definition: agent.cc:364
virtual request_ptr_t create_request(scarab::param_node &a_config)
Definition: agent.cc:456
virtual request_ptr_t create_request(scarab::param_node &a_config)
Definition: agent.cc:429
virtual void create_and_send_message(scarab::param_node &a_config, const core &a_core)
Definition: agent.cc:297
virtual void create_and_send_message(scarab::param_node &a_config, const core &a_core)
Definition: agent.cc:152
virtual request_ptr_t create_request(scarab::param_node &a_config)
Definition: agent.cc:439
void execute(const scarab::param_node &a_config, const scarab::authentication &a_auth)
Definition: agent.cc:60
Error indicating a problem with the connection to the broker.
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
Definition: core.hh:75
virtual sent_msg_pkg_ptr send(request_ptr_t a_request, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
Definition: core.cc:152
post_listen_status
Definition: core.hh:80
@ timeout
A timeout occurred, and the channel is still valid.
@ unknown
Initialized or unknown status.
@ hard_error
An error occurred, and the channel is no longer valid.
Dripline-specific errors.
static alert_ptr_t create(scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Creates an alert message.
Definition: message.cc:543
static reply_ptr_t create(const return_code &a_return_code, const std::string &a_ret_msg, scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Create a reply message using a return_code object and manually specifying the destination.
static request_ptr_t create(scarab::param_ptr_t a_payload, op_t a_msg_op, const std::string &a_routing_key, const std::string &a_specifier="", const std::string &a_reply_to="", message::encoding a_encoding=encoding::json)
Create a request message.
Definition: message.cc:482
A receiver is able to collect Dripline message chunks and reassemble them into a complete Dripline me...
Definition: receiver.hh:78
reply_ptr_t wait_for_reply(const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms=0)
Definition: receiver.cc:206
std::shared_ptr< msg_alert > alert_ptr_t
Definition: dripline_fwd.hh:25
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
uuid_t uuid_from_string(const std::string &a_id_str)
Definition: uuid.cc:31
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
Definition: dripline_fwd.hh:27
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
std::shared_ptr< msg_request > request_ptr_t
Definition: dripline_fwd.hh:23
uuid_t generate_nil_uuid()
Generates a UUID containing all 0s.
Definition: uuid.cc:26