Dripline-Cpp  v2.10.11
Dripline Implementation in C++
core.cc
Go to the documentation of this file.
1 /*
2  * core.cc
3  *
4  * Created on: Jun 27, 2017
5  * Author: N.S. Oblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "core.hh"
11 
12 #include "dripline_exceptions.hh"
13 #include "message.hh"
14 
15 #include "authentication.hh"
16 #include "exponential_backoff.hh"
17 #include "logger.hh"
18 #include "param_codec.hh"
19 #include "signal_handler.hh"
20 
21 #include <array>
22 
23 
24 namespace dripline
25 {
26 
27  LOGGER( dlog, "amqp" );
28 
30  {
31  if( f_channel )
32  {
33  try
34  {
35  LDEBUG( dlog, "Stopping consuming messages" );
36  f_channel->BasicCancel( f_consumer_tag );
37  }
38  catch( amqp_exception& e )
39  {
40  LERROR( dlog, "AMQP exception caught while canceling the channel: (" << e.reply_code() << ") " << e.reply_text() );
41  }
42  catch( amqp_lib_exception& e )
43  {
44  LERROR( dlog, "AMQP library exception caught while canceling the channel: (" << e.ErrorCode() << ") " << e.what() );
45  }
46  }
47  }
48 
49  bool core::s_offline = false;
50 
51  core::core( const scarab::param_node& a_config, const scarab::authentication& a_auth, const bool a_make_connection ) :
52  f_address(),
53  f_port(),
54  f_username(),
55  f_password(),
56  f_requests_exchange(),
57  f_alerts_exchange(),
58  f_heartbeat_routing_key(),
59  f_max_payload_size(),
60  f_make_connection(),
61  f_max_connection_attempts()
62  {
63  // Get the default values, and merge in the supplied a_config
64  // a_config's default value is also dripline_config, but the user can supply an arbitrary node.
65  // So we need to assume no configuration values are supplied and we start again from dripline_config, then merge in a_config.
66  dripline_config t_config;
67  t_config.merge( a_config );
68  LDEBUG( dlog, "Dripline core being configured with:\n" << t_config );
69 
70 /* DO WE WANT TO USE ALTERNATIVE AUTH GROUPS?
71  std::array< std::string > t_potential_groups{"dripline", "amqp", "rabbitmq"};
72  std::string t_auth_group;
73  for( const auto& i_gr : t_potential_goups )
74  {
75  if( a_auth.has( i_gr ) )
76  {
77  t_auth_group = i_gr;
78  break;
79  }
80  }
81  LDEBUG( dlog, "Using auth group <" << t_auth_group << ">" );
82 
83  f_username = t_auth.get( t_auth_group, "username", f_username );
84  f_password = t_auth.get( t_auth_group, "password", f_password );
85 */
86  // Replace local parameters with values from the config
87  f_address = t_config["broker"]().as_string(); //.get_value("broker", "localhost");
88  f_port = t_config["broker_port"]().as_uint(); //.get_value("broker_port", 5672);
89  f_requests_exchange = t_config["requests_exchange"]().as_string(); //.get_value("requests_exchange", "requests");
90  f_alerts_exchange = t_config["alerts_exchange"]().as_string(); //.get_value("alerts_exchange", "alerts");
91  f_heartbeat_routing_key = t_config["heartbeat_routing_key"]().as_string(); //.get_value("heartbeat_routing_key", "heartbeat");
92  f_make_connection = t_config.get_value( "make_connection", a_make_connection );
93  f_max_payload_size = t_config["max_payload_size"]().as_uint(); //.get_value("max_payload_size", DL_MAX_PAYLOAD_SIZE);
94  f_max_connection_attempts = t_config["max_connection_attempts"]().as_uint(); //.get_value("max_connection_attempts", 10);
95 
96  f_username = a_auth.get("dripline", "username", "guest");
97  f_password = a_auth.get("dripline", "password", "guest");
98 
99  // additional return codes
100  if( t_config.has( "return_codes" ) )
101  {
102  // define a function for extracting return codes from a param_array so that we can use it in a couple places
103  auto t_extract_codes = [](const scarab::param_array& a_codes)
104  {
105  LDEBUG( dlog, "Adding return codes:\n" << a_codes );
106  for( auto t_code_it = a_codes.begin(); t_code_it != a_codes.end(); ++t_code_it )
107  {
108  try
109  {
110  const scarab::param_node& t_a_node = t_code_it->as_node();
111  if( check_and_add_return_code( t_a_node["value"]().as_uint(), t_a_node["name"]().as_string(), t_a_node["description"]().as_string() ) )
112  {
113  LDEBUG( dlog, "Added return code <" << t_a_node["name"]().as_string() << " (" << t_a_node["value"]().as_uint() << ")>: " << t_a_node["description"]().as_string() );
114  }
115  }
116  catch( const scarab::error& e )
117  {
118  throw dripline_error() << "Invalid configuration for a return code:\n" << *t_code_it << '\n' << e.what();
119  }
120  catch( const std::out_of_range& e )
121  {
122  throw dripline_error() << "Missing configuration parameter for a return code:\n" << *t_code_it;
123  }
124  }
125  return;
126  };
127 
128  if( t_config["return_codes"].is_value() && t_config["return_codes"]().is_string() )
129  {
130  // then it's a filename; load YAML
131  std::string t_filename( t_config["return_codes"]().as_string() );
132  scarab::param_translator t_translator;
133  scarab::param_ptr_t t_ret_codes = t_translator.read_file( t_filename );
134  if( ! t_ret_codes || ! t_ret_codes->is_array() )
135  {
136  throw dripline_error() << "Could not find or open return-code config file, or the config does not contain an array: " << t_filename;
137  }
138  t_extract_codes( t_ret_codes->as_array() );
139  }
140  else if( t_config["return_codes"].is_array() )
141  {
142  // then individual codes are specified
143  t_extract_codes( t_config["return_codes"].as_array() );
144  }
145  else
146  {
147  throw dripline_error() << "Return code configuration is invalid:\n" << t_config["return_codes"];
148  }
149  }
150  }
151 
153  {
154  LDEBUG( dlog, "Sending request with routing key <" << a_request->routing_key() << ">" );
155  if ( ! f_make_connection || core::s_offline )
156  {
157  throw a_request;
158  //throw dripline_error() << "cannot send reply when make_connection is false";
159  }
160  return do_send( std::static_pointer_cast< message >( a_request ), f_requests_exchange, true, a_channel );
161  }
162 
164  {
165  LDEBUG( dlog, "Sending reply with routing key <" << a_reply->routing_key() << ">" );
166  if ( ! f_make_connection || core::s_offline )
167  {
168  throw a_reply;
169  //throw dripline_error() << "cannot send reply when make_connection is false";
170  }
171  return do_send( std::static_pointer_cast< message >( a_reply ), f_requests_exchange, false, a_channel );
172  }
173 
175  {
176  LDEBUG( dlog, "Sending alert with routing key <" << a_alert->routing_key() << ">" );
177  if ( ! f_make_connection || core::s_offline )
178  {
179  throw a_alert;
180  //throw dripline_error() << "cannot send reply when make_connection is false";
181  }
182  return do_send( std::static_pointer_cast< message >( a_alert ), f_alerts_exchange, false, a_channel );
183  }
184 
185  sent_msg_pkg_ptr core::do_send( message_ptr_t a_message, const std::string& a_exchange, bool a_expect_reply, amqp_channel_ptr a_channel ) const
186  {
187  // throws connection_error if it could not connect with the broker
188  // throws dripline_error if there's a problem with the exchange or creating the AMQP message object(s)
189  // returns the receive_reply package if the message was completely or partially sent
190  // the f_successful_send flag will be set accordingly: true if completely sent; false if partially sent
191  // if there was an error sending the message, that will be returned in f_send_error_message, which will be empty otherwise
192 
193  // lambda to create a string with the basic information about the send attempt
194  auto t_diagnostic_string_maker = [a_message, this]() -> std::string {
195  return std::string("Broker: ") + f_address +"\nPort: " + std::to_string(f_port) + "\nRouting Key: " + a_message->routing_key();
196  };
197 
198  amqp_channel_ptr t_channel = a_channel ? a_channel : open_channel();
199  if( ! t_channel )
200  {
201  throw connection_error() << "Unable to open channel to send message\n" << t_diagnostic_string_maker();
202  }
203 
204  if( ! setup_exchange( t_channel, a_exchange ) )
205  {
206  throw dripline_error() << "Unable to setup the exchange <" << a_exchange << "> to send message\n" << t_diagnostic_string_maker();
207  }
208 
209  // create empty receive-reply object
210  sent_msg_pkg_ptr t_receive_reply = std::make_shared< sent_msg_pkg >();
211  std::unique_lock< std::mutex > t_rr_lock( t_receive_reply->f_mutex );
212 
213  if( a_expect_reply )
214  {
215  t_receive_reply->f_channel = t_channel;
216 
217  // create the reply-to queue, and bind the queue to the routing key over the given exchange
218  std::string t_reply_to = t_channel->DeclareQueue( "" );
219  t_channel->BindQueue( t_reply_to, a_exchange, t_reply_to );
220  // set the reply-to in the message because now we have the queue to which to reply
221  a_message->reply_to() = t_reply_to;
222 
223  // begin consuming on the reply-to queue
224  t_receive_reply->f_consumer_tag = t_channel->BasicConsume( t_reply_to );
225  LDEBUG( dlog, "Reply-to for request: " << t_reply_to );
226  LDEBUG( dlog, "Consumer tag for reply: " << t_receive_reply->f_consumer_tag );
227  }
228 
229  // convert the dripline::message object to an AMQP message
230  amqp_split_message_ptrs t_amqp_messages = a_message->create_amqp_messages( f_max_payload_size );
231  if( t_amqp_messages.empty() )
232  {
233  throw dripline_error() << "Unable to convert the dripline::message object to AMQP message(s) to be sent\n" << t_diagnostic_string_maker();
234  }
235 
236  try
237  {
238  LDEBUG( dlog, "Sending message to <" << a_message->routing_key() << ">" );
239  for( amqp_message_ptr& t_amqp_message : t_amqp_messages )
240  {
241  // send the message
242  // the first boolean argument is whether it's mandatory that the message be delivered to a queue.
243  // this is only the case for requests, where we expect something to be listening.
244  t_channel->BasicPublish( a_exchange, a_message->routing_key(), t_amqp_message, a_message->is_request(), false );
245  }
246  LDEBUG( dlog, "Message sent in " << t_amqp_messages.size() << " chunks" );
247  t_receive_reply->f_successful_send = true;
248  t_receive_reply->f_send_error_message.clear();
249  }
250  catch( AmqpClient::ConnectionClosedException& e )
251  {
252  LERROR( dlog, "Unable to send message because the connection is closed: " << e.what() );
253  throw connection_error() << "Unable to send message because the connection is closed: " << e.what() << '\n' << t_diagnostic_string_maker();
254  }
255  catch( AmqpClient::AmqpLibraryException& e )
256  {
257  LERROR( dlog, "AMQP error while sending message: " << e.what() );
258  t_receive_reply->f_successful_send = false;
259  t_receive_reply->f_send_error_message = std::string("AMQP error while sending message: ") + std::string(e.what()) + '\n' + t_diagnostic_string_maker();
260  }
261  catch( AmqpClient::MessageReturnedException& e )
262  {
263  LERROR( dlog, "Message was returned: " << e.what() );
264  t_receive_reply->f_successful_send = false;
265  t_receive_reply->f_send_error_message = std::string("Message was returned: ") + std::string(e.what()) + '\n' + t_diagnostic_string_maker();
266  }
267  catch( std::exception& e )
268  {
269  LERROR( dlog, "Error while sending message: " << e.what() );
270  t_receive_reply->f_successful_send = false;
271  t_receive_reply->f_send_error_message = std::string("Error while sending message: ") + std::string(e.what()) + '\n' + t_diagnostic_string_maker();
272  }
273 
274  return t_receive_reply;
275  }
276 
278  {
279  // Exceptions that can be encountered while opening a channel
280  // SimpleAmqpClient::Channel::Open(opts)
281  // std::runtime_error -- options are invalid; auth not specified
282  // std::logic_error -- unhandled auth type
283  // std::bad_alloc -- connection is null
284  // amqp_exception -- unsure of what would cause this
285  // amqp_lib_exception -- unable to make connection to the broker; maybe other things
286 
287  if ( ! f_make_connection || core::s_offline )
288  {
289  return amqp_channel_ptr();
290  //throw dripline_error() << "Should not call open_channel when offline";
291  }
292 
293  amqp_channel_ptr t_ret_ptr = amqp_channel_ptr();
294 
295  auto t_open_conn_fcn = [&]()->bool
296  {
297  try
298  {
299  LINFO( dlog, "Opening AMQP connection and creating channel to " << f_address << ":" << f_port );
300  LDEBUG( dlog, "Using broker authentication: " << f_username << ":" << f_password );
301  struct AmqpClient::Channel::OpenOpts opts;
302  opts.host = f_address;
303  opts.port = f_port;
304  opts.auth = AmqpClient::Channel::OpenOpts::BasicAuth(f_username, f_password);
305  t_ret_ptr = AmqpClient::Channel::Open( opts );
306  return true;
307  }
308  catch( amqp_exception& e )
309  {
310  if( e.is_soft_error() )
311  {
312  LWARN( dlog, "Recoverable AMQP exception caught while opening channel: (" << e.reply_code() << ") " << e.reply_text() );
313  return false;
314  }
315  // otherwise error is non-recoverable
316  throw;
317  }
318  catch( amqp_lib_exception& e )
319  {
320  LERROR( dlog, "AMQP Library Exception caught while creating channel: (" << e.ErrorCode() << ") " << e.what() );
321  if( e.ErrorCode() == -9 )
322  {
323  LERROR( dlog, "This error means the client could not connect to the broker.\n" <<
324  "Check that you have the address and port correct, and that the broker is running.")
325  }
326  return false;
327  }
328  // std::exceptions are non-recoverable, so don't catch them
329  };
330 
331  scarab::exponential_backoff<> t_open_conn_backoff( t_open_conn_fcn, f_max_connection_attempts );
332  auto t_exp_cancel_wrap = wrap_cancelable( t_open_conn_backoff );
333  scarab::signal_handler::add_cancelable( t_exp_cancel_wrap );
334 
335  int t_expback_return = 0;
336  try
337  {
338  LDEBUG( dlog, "Attempting to open channel; will make up to " << f_max_connection_attempts << " attempts" );
339  t_expback_return = t_open_conn_backoff.go();
340  // either succeeded or failed after multiple attempts
341  }
342  catch( amqp_exception& e )
343  {
344  LERROR( dlog, "Unrecoverable AMQP exception caught while opening channel: (" << e.reply_code() << ") " << e.reply_text() );
345  }
346  catch(const std::exception& e)
347  {
348  // unrecoverable error causing a std::exception
349  LERROR( dlog, "Standard exception caught while creating channel: " << e.what() );
350  }
351 
352  if( t_expback_return == 0 )
353  {
354  LERROR( dlog, "Failed to open a channel; no more attempts will be made" );
355  }
356 
357  return t_ret_ptr;
358  }
359 
360  bool core::setup_exchange( amqp_channel_ptr a_channel, const std::string& a_exchange )
361  {
362  if( s_offline || ! a_channel )
363  {
364  return false;
365  }
366 
367  try
368  {
369  LDEBUG( dlog, "Declaring exchange <" << a_exchange << ">" );
370  a_channel->DeclareExchange( a_exchange, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC, false, false, false );
371  return true;
372  }
373  catch( amqp_exception& e )
374  {
375  LERROR( dlog, "AMQP exception caught while declaring exchange: (" << e.reply_code() << ") " << e.reply_text() );
376  return false;
377  }
378  catch( amqp_lib_exception& e )
379  {
380  LERROR( dlog, "AMQP library exception caught while declaring exchange: (" << e.ErrorCode() << ") " << e.what() );
381  return false;
382  }
383  }
384 
385  bool core::setup_queue( amqp_channel_ptr a_channel, const std::string& a_queue_name )
386  {
387  if( s_offline || ! a_channel )
388  {
389  return false;
390  }
391 
392  try
393  {
394  LDEBUG( dlog, "Declaring queue <" << a_queue_name << ">" );
395  a_channel->DeclareQueue( a_queue_name, false, false, true, true );
396  return true;
397  }
398  catch( amqp_exception& e )
399  {
400  LERROR( dlog, "AMQP exception caught while declaring queue: (" << e.reply_code() << ") " << e.reply_text() );
401  return false;
402  }
403  catch( amqp_lib_exception& e )
404  {
405  LERROR( dlog, "AMQP library exception caught while declaring queue: (" << e.ErrorCode() << ") " << e.what() );
406  return false;
407  }
408 
409  }
410 
411  bool core::bind_key( amqp_channel_ptr a_channel, const std::string& a_exchange, const std::string& a_queue_name, const std::string& a_routing_key )
412  {
413  if( s_offline || ! a_channel )
414  {
415  return false;
416  }
417 
418  try
419  {
420  LDEBUG( dlog, "Binding key <" << a_routing_key << "> to queue <" << a_queue_name << "> over exchange <" << a_exchange << ">" );
421  a_channel->BindQueue( a_queue_name, a_exchange, a_routing_key );
422 
423  return true;
424  }
425  catch( amqp_exception& e )
426  {
427  LERROR( dlog, "AMQP exception caught while declaring binding key <" << a_routing_key << ">: (" << e.reply_code() << ") " << e.reply_text() );
428  return false;
429  }
430  catch( amqp_lib_exception& e )
431  {
432  LERROR( dlog, "AMQP library exception caught while binding key <" << a_routing_key << ">: (" << e.ErrorCode() << ") " << e.what() );
433  return false;
434  }
435  }
436 
437  std::string core::start_consuming( amqp_channel_ptr a_channel, const std::string& a_queue_name )
438  {
439  if( s_offline || ! a_channel )
440  {
441  return std::string();
442  }
443 
444  try
445  {
446  LDEBUG( dlog, "Starting to consume messages on queue <" << a_queue_name << ">" );
447  // second bool is setting no_ack to false
448  return a_channel->BasicConsume( a_queue_name, "", true, false );
449  }
450  catch( amqp_exception& e )
451  {
452  LERROR( dlog, "AMQP exception caught while starting consuming messages on <" << a_queue_name << ">: (" << e.reply_code() << ") " << e.reply_text() );
453  return std::string();
454  }
455  catch( amqp_lib_exception& e )
456  {
457  LERROR( dlog, "AMQP library exception caught while starting consuming messages on <" << a_queue_name << ">: (" << e.ErrorCode() << ") " << e.what() );
458  return std::string();
459  }
460  }
461 
462  bool core::stop_consuming( amqp_channel_ptr a_channel, std::string& a_consumer_tag )
463  {
464  if( s_offline || ! a_channel )
465  {
466  return false;
467  }
468 
469  try
470  {
471  LDEBUG( dlog, "Stopping consuming messages for consumer <" << a_consumer_tag << ">" );
472  a_channel->BasicCancel( a_consumer_tag );
473  a_consumer_tag.clear();
474  return true;
475  }
476  catch( amqp_exception& e )
477  {
478  LERROR( dlog, "AMQP exception caught while stopping consuming messages on <" << a_consumer_tag << ">: (" << e.reply_code() << ") " << e.reply_text() );
479  return false;
480  }
481  catch( amqp_lib_exception& e )
482  {
483  LERROR( dlog, "AMQP library exception caught while stopping consuming messages on <" << a_consumer_tag << ">: (" << e.ErrorCode() << ") " << e.what() );
484  return false;
485  }
486  catch( AmqpClient::ConsumerTagNotFoundException& e )
487  {
488  LERROR( dlog, "Fatal AMQP exception encountered while stopping consuming messages on <" << a_consumer_tag << ">: " << e.what() );
489  return false;
490  }
491  catch( std::exception& e )
492  {
493  LERROR( dlog, "Standard exception caught while stopping consuming messages on <" << a_consumer_tag << ">: " << e.what() );
494  return false;
495  }
496  }
497 
498  bool core::remove_queue( amqp_channel_ptr a_channel, const std::string& a_queue_name )
499  {
500  if( s_offline || ! a_channel )
501  {
502  return false;
503  }
504 
505  try
506  {
507  LDEBUG( dlog, "Deleting queue <" << a_queue_name << ">" );
508  a_channel->DeleteQueue( a_queue_name, false );
509  return true;
510  }
511  catch( AmqpClient::ConnectionClosedException& e )
512  {
513  LERROR( dlog, "Fatal AMQP exception encountered removing queue <" << a_queue_name << ">: " << e.what() );
514  return false;
515  }
516  catch( amqp_lib_exception& e )
517  {
518  LERROR( dlog, "AMQP library exception caught while removing queue <" << a_queue_name << ">: (" << e.ErrorCode() << ") " << e.what() );
519  return false;
520  }
521  catch( std::exception& e )
522  {
523  LERROR( dlog, "Standard exception caught while removing queue <" << a_queue_name << ">: " << e.what() );
524  return false;
525  }
526  }
527 
528  void core::listen_for_message( amqp_envelope_ptr& a_envelope, core::post_listen_status& a_status, amqp_channel_ptr a_channel, const std::string& a_consumer_tag, int a_timeout_ms, bool a_do_ack )
529  {
530  if( s_offline || ! a_channel )
531  {
533  return;
534  }
535 
536  while( true )
537  {
538  try
539  {
540  if( a_timeout_ms > 0 )
541  {
542  a_channel->BasicConsumeMessage( a_consumer_tag, a_envelope, a_timeout_ms );
543  }
544  else
545  {
546  a_envelope = a_channel->BasicConsumeMessage( a_consumer_tag );
547  }
548  if( a_envelope )
549  {
550  if( a_do_ack ) a_channel->BasicAck( a_envelope );
552  }
553  else
554  {
555  a_status = post_listen_status::timeout;
556  }
557  return;
558  }
559  catch( AmqpClient::ConnectionClosedException& e )
560  {
561  LERROR( dlog, "Fatal AMQP exception encountered: " << e.what() );
563  return;
564  }
565  catch( AmqpClient::ConsumerCancelledException& e )
566  {
567  LERROR( dlog, "Fatal AMQP exception encountered: " << e.what() );
569  return;
570  }
571  catch( AmqpClient::AmqpException& e )
572  {
573  if( e.is_soft_error() )
574  {
575  LWARN( dlog, "Non-fatal AMQP exception encountered: " << e.reply_text() );
577  return;
578  }
579  LERROR( dlog, "Fatal AMQP exception encountered: " << e.reply_text() );
581  return;
582  }
583  catch( std::exception& e )
584  {
585  LERROR( dlog, "Standard exception caught: " << e.what() );
587  return;
588  }
589  catch(...)
590  {
591  LERROR( dlog, "Unknown exception caught" );
593  return;
594  }
595  }
596  }
597 
598 } /* namespace dripline */
599 
Error indicating a problem with the connection to the broker.
static bool s_offline
Definition: core.hh:77
static bool bind_key(amqp_channel_ptr a_channel, const std::string &a_exchange, const std::string &a_queue_name, const std::string &a_routing_key)
Definition: core.cc:411
sent_msg_pkg_ptr do_send(message_ptr_t a_message, const std::string &a_exchange, bool a_expect_reply, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
Definition: core.cc:185
static bool setup_exchange(amqp_channel_ptr a_channel, const std::string &a_exchange)
Definition: core.cc:360
virtual sent_msg_pkg_ptr send(request_ptr_t a_request, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
Definition: core.cc:152
static void listen_for_message(amqp_envelope_ptr &a_envelope, post_listen_status &a_status, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
listen for a single AMQP message
Definition: core.cc:528
post_listen_status
Definition: core.hh:80
@ message_received
A message was received, and the channel is still valid.
@ timeout
A timeout occurred, and the channel is still valid.
@ unknown
Initialized or unknown status.
@ soft_error
An error occurred, but the channel should still be valid.
@ hard_error
An error occurred, and the channel is no longer valid.
static std::string start_consuming(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Definition: core.cc:437
core(const scarab::param_node &a_config=dripline_config(), const scarab::authentication &a_auth=scarab::authentication(), const bool a_make_connection=true)
Definition: core.cc:51
static bool stop_consuming(amqp_channel_ptr a_channel, std::string &a_consumer_tag)
Definition: core.cc:462
amqp_channel_ptr open_channel() const
Definition: core.cc:277
static bool setup_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Definition: core.cc:385
static bool remove_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Definition: core.cc:498
Sets the default configuration used by core. These parameters pertain to the dripline mesh that will ...
Dripline-specific errors.
bool check_and_add_return_code(unsigned a_value, const std::string &a_name, const std::string &a_description)
Definition: return_codes.cc:97
AmqpClient::Channel::ptr_t amqp_channel_ptr
Definition: amqp.hh:24
AmqpClient::AmqpLibraryException amqp_lib_exception
Definition: amqp.hh:29
std::vector< amqp_message_ptr > amqp_split_message_ptrs
Definition: amqp.hh:31
std::shared_ptr< msg_alert > alert_ptr_t
Definition: dripline_fwd.hh:25
std::string to_string(op_t an_op)
Gives the human-readable version of a message operation.
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
Definition: dripline_fwd.hh:27
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
AmqpClient::BasicMessage::ptr_t amqp_message_ptr
Definition: amqp.hh:26
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
std::shared_ptr< msg_request > request_ptr_t
Definition: dripline_fwd.hh:23
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
Definition: amqp.hh:25
AmqpClient::AmqpException amqp_exception
Definition: amqp.hh:28
amqp_channel_ptr f_channel
Definition: core.hh:42
std::string f_consumer_tag
Definition: core.hh:43