Dripline-Cpp  v2.10.11
Dripline Implementation in C++
service.cc
Go to the documentation of this file.
1 /*
2  * service.cc
3  *
4  * Created on: Jan 5, 2016
5  * Author: nsoblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "service.hh"
11 
12 #include "dripline_config.hh"
13 #include "dripline_exceptions.hh"
14 #include "service_config.hh"
15 
16 #include "authentication.hh"
17 #include "logger.hh"
18 
19 using scarab::authentication;
20 using scarab::param_node;
21 using scarab::param_value;
22 using scarab::param_ptr_t;
23 
24 using std::string;
25 using std::set;
26 
27 namespace dripline
28 {
29  LOGGER( dlog, "service" );
30 
31  service::service( const scarab::param_node& a_config, const scarab::authentication& a_auth, const bool a_make_connection ) :
32  scarab::cancelable(),
33  core( a_config.has("dripline_mesh") ? a_config["dripline_mesh"].as_node() : dripline_config(),
34  a_auth, a_make_connection ),
35  endpoint( a_config.get_value( "name", "dlcpp_service" ) ),
37  heartbeater( this ),
38  scheduler<>(),
39  f_auth( a_auth ),
40  f_status( status::nothing ),
41  f_restart_on_error( a_config.get_value( "restart_on_error", true ) ),
42  f_enable_scheduling( a_config.get_value( "enable_scheduling", false ) ),
43  f_id( generate_random_uuid() ),
44  f_sync_children(),
45  f_async_children(),
46  f_broadcast_key( a_config.get_value( "broadcast_key", "broadcast" ) )
47  {
48  LDEBUG( dlog, "Service (cpp) created with config:\n" << a_config );
49  // get more values from the config
50  // default of f_listen_timeout_ms is in the listener class
51  f_listen_timeout_ms = a_config.get_value( "loop_timeout_ms", f_listen_timeout_ms );
52  heartbeater::f_check_timeout_ms = f_listen_timeout_ms;
53  // default of f_single_message_wait_ms is in the receiver class
54  f_single_message_wait_ms = a_config.get_value( "message_wait_ms", f_single_message_wait_ms );
55  // default of f_heartbeat_interval_s is in the heartbeater class
56  f_heartbeat_interval_s = a_config.get_value( "heartbeat_interval_s", f_heartbeat_interval_s );
57  }
58 /*
59  service::service( const bool a_make_connection, const scarab::param_node& a_config, const scarab::authentication& a_auth ) :
60  scarab::cancelable(),
61  core( a_make_connection, a_config ),
62  endpoint( "" ),
63  listener_receiver(),
64  heartbeater(),
65  scheduler<>(),
66  std::enable_shared_from_this< service >(),
67  f_status( status::nothing ),
68  f_enable_scheduling( a_config.get_value("enable-scheduling", false ) ),
69  f_id( generate_random_uuid() ),
70  f_sync_children(),
71  f_async_children(),
72  f_broadcast_key()
73  {
74  }
75 */
76 
78  {
79  if( f_status >= status::listening )
80  {
81  this->cancel( dl_success().rc_value() );
82  std::this_thread::sleep_for( std::chrono::milliseconds(1100) );
83  }
84  if( f_status > status::exchange_declared ) stop();
85  }
86 
88  {
89  cancelable::operator=( std::move(a_orig) );
90  core::operator=( std::move(a_orig) );
91  endpoint::operator=( std::move(a_orig));
92  listener_receiver::operator=( std::move(a_orig) );
93  heartbeater::operator=( std::move(a_orig) );
94  scheduler<>::operator=( std::move(a_orig) );
95 
96  f_status = std::move( a_orig.f_status );
97  f_restart_on_error = a_orig.f_restart_on_error;
98  f_enable_scheduling = a_orig.f_enable_scheduling;
99  f_id = std::move( a_orig.f_id );
100  f_sync_children = std::move( a_orig.f_sync_children );
101  f_async_children = std::move( a_orig.f_async_children );
102  f_broadcast_key = std::move( a_orig.f_broadcast_key );
103 
104  return *this;
105  }
106 
107  bool service::add_child( endpoint_ptr_t a_endpoint_ptr )
108  {
109  auto t_inserted = f_sync_children.insert( std::make_pair( a_endpoint_ptr->name(), a_endpoint_ptr ) );
110  if( t_inserted.second )
111  {
112  a_endpoint_ptr->set_service( this );
113  }
114  else
115  {
116  LERROR( dlog, "Endpoint <" << a_endpoint_ptr->name() << " could not be added to service <" << f_name << ">" );
117  return false;
118  }
119  return t_inserted.second;
120  }
121 
123  {
124  lr_ptr_t t_listener_receiver_ptr = std::dynamic_pointer_cast< listener_receiver >( a_endpoint_ptr );
125  if( ! t_listener_receiver_ptr )
126  {
127  t_listener_receiver_ptr.reset( new endpoint_listener_receiver( a_endpoint_ptr ) );
128  }
129  auto t_inserted = f_async_children.insert( std::make_pair( a_endpoint_ptr->name(), t_listener_receiver_ptr ) );
130  if( t_inserted.second )
131  {
132  a_endpoint_ptr->set_service( this );
133  }
134  else
135  {
136  LERROR( dlog, "Endpoint (async) <" << a_endpoint_ptr->name() << " could not be added to service <" << f_name << ">" );
137  return false;
138  }
139  return t_inserted.second;
140  }
141 
143  {
144  unsigned n_failures = 0;
145  bool t_do_repeat = true; // start true so that we get into the repeat loop
146  // Repeat loop for listening: we may call to listen_on_queue() multiple times
147  while( t_do_repeat )
148  {
149  t_do_repeat = false; // set false because we'll only do the repeat based on the conditions below
150  LINFO( dlog, "Starting the service" );
151  if( ! start() ) throw dripline_error() << "There was a problem while starting the service (check for prior error messages)";
152  try
153  {
154  LINFO( dlog, "Service started; now listening for messages" );
155  if( ! listen() ) throw dripline_error() << "There was a problem while listening for messages (check for prior error messages)";
156  n_failures = 0; // reset the number of failures to 0 once there's been a successful connection
157  }
158  catch( const dripline_error& e )
159  {
160  // We had an error while listening
161  // Check whether or not we should try to connect again
162  // 1. If we want to restart on error, and
163  // 2. If the failure count is less than our threshold (2)
164  ++n_failures;
165  if( f_restart_on_error && n_failures < 2 )
166  {
167  t_do_repeat = true;
168  // we'll report and then drop the exception to do the reconnect
169  LWARN( dlog, e.what() );
170  LWARN( dlog, "Will attempt to reconnect" );
171  }
172  else
173  {
174  // if we're not going to connect again, and we had an error, propagate the error by rethrowing
175  LERROR( dlog, "Reached maximum number of reconnect attempts" );
176  throw;
177  }
178  }
179 
180  if( t_do_repeat )
181  {
182  reset_cancel();
183  }
184  }
185 
186  LINFO( dlog, "Stopping the service" );
187  if( ! stop() ) throw dripline_error() << "There was a problem while stopping the service (check for prior error messages)";
188 
189  return;
190  }
191 
193  {
194  if( ! f_make_connection )
195  {
196  LWARN( dlog, "Should not start service when make_connection is disabled" );
197  return true;
198  }
199  if( f_name.empty() )
200  {
201  LERROR( dlog, "Service requires a queue name to be started" );
202  return false;
203  }
204 
205  // fill in the link to this in endpoint because we couldn't do it in the constructor
206  endpoint::f_service = this;
207  heartbeater::f_service = this;
208 
209  LINFO( dlog, "Connecting to <" << f_address << ":" << f_port << ">" );
210 
211  if( ! open_channels() ) return false;
212  f_status = status::channel_created;
213 
214  if( ! setup_exchange( f_channel, f_requests_exchange ) ) return false;
215  if( ! setup_exchange( f_channel, f_alerts_exchange ) ) return false;
216  f_status = status::exchange_declared;
217 
218  if( ! setup_queues() ) return false;
219  f_status = status::queue_declared;
220 
221  if( ! bind_keys() ) return false;
222  f_status = status::queue_bound;
223 
224  if( ! start_consuming() ) return false;
225  f_status = status::consuming;
226 
227  return true;
228  }
229 
231  {
232  if ( ! f_make_connection )
233  {
234  LWARN( dlog, "Should not listen for messages when make_connection is disabled" );
235  return true;
236  }
237 
238  f_status = status::listening;
239 
240  try
241  {
242  if( f_heartbeat_interval_s != 0 )
243  {
244  LINFO( dlog, "Starting heartbeat" );
245  f_heartbeat_thread = std::thread( &heartbeater::execute, this, f_name, f_id, f_heartbeat_routing_key );
246  }
247  else
248  {
249  LINFO( dlog, "Heartbeat disabled" );
250  }
251 
252  if( f_enable_scheduling )
253  {
254  LINFO( dlog, "Starting scheduler" );
255  f_scheduler_thread = std::thread( &scheduler::execute, this );
256  }
257  else
258  {
259  LINFO( dlog, "Scheduler disabled" );
260  }
261 
262  LINFO( dlog, "Starting receiver thread" );
263  f_receiver_thread = std::thread( &concurrent_receiver::execute, this );
264 
265  // lambda to cancel everything on an error from listener::listen_on_queue()
266  bool t_listen_error = false;
267  auto t_cancel_on_listen_error = [&t_listen_error, this](listener& a_listener) {
268  if( ! a_listener.listen_on_queue() )
269  {
270  t_listen_error = true;
271  this->cancel( RETURN_ERROR );
272  }
273  };
274 
275  if( ! f_async_children.empty() ) { LINFO( dlog, "Starting async children" ); }
276  else { LDEBUG( dlog, "No async children to start" ); }
277  for( async_map_t::iterator t_child_it = f_async_children.begin();
278  t_child_it != f_async_children.end();
279  ++t_child_it )
280  {
281  t_child_it->second->receiver_thread() = std::thread( &concurrent_receiver::execute, static_cast< listener_receiver* >(t_child_it->second.get()) );
282  t_child_it->second->listener_thread() = std::thread( t_cancel_on_listen_error, std::ref(*t_child_it->second.get()) );
283  }
284 
285  LINFO( dlog, "Starting listener thread" );
286  t_cancel_on_listen_error( *this );
287 
288  for( async_map_t::iterator t_child_it = f_async_children.begin();
289  t_child_it != f_async_children.end();
290  ++t_child_it )
291  {
292  t_child_it->second->listener_thread().join();
293  t_child_it->second->receiver_thread().join();
294  }
295 
296  f_receiver_thread.join();
297 
298  if( f_heartbeat_thread.joinable() )
299  {
300  f_heartbeat_thread.join();
301  }
302  if( f_scheduler_thread.joinable() )
303  {
304  f_scheduler_thread.join();
305  }
306 
307  if( t_listen_error) throw dripline_error() << "Something went wrong while listening for messages";
308  }
309  catch( std::system_error& e )
310  {
311  LERROR( dlog, "Could not start the a thread due to a system error: " << e.what() );
312  return false;
313  }
314  catch( dripline_error& e )
315  {
316  LERROR( dlog, "Dripline error while running service: " << e.what() );
317  return false;
318  }
319  catch( std::exception& e )
320  {
321  LERROR( dlog, "Error while running service: " << e.what() );
322  return false;
323  }
324 
325  return true;
326  }
327 
329  {
330  LINFO( dlog, "Stopping service on <" << f_name << ">" );
331 
332  if( f_status >= status::listening ) // listening or processing
333  {
334  this->cancel( dl_success().rc_value() );
335  f_status = status::consuming;
336  }
337  if( f_status >= status::queue_bound ) // queue_bound or consuming
338  {
339  if( ! stop_consuming() ) return false;
340  f_status = status::queue_bound;
341  }
342 
343 
344  if( f_status >= status::queue_declared ) // queue_declared or queue_bound
345  {
346  if( ! remove_queue() ) return false;
347  f_status = status::exchange_declared;
348  }
349 
350  return true;
351  }
352 
354  {
355  LDEBUG( dlog, "Opening channel for service <" << f_name << ">" );
356  f_channel = open_channel();
357  if( ! f_channel ) return false;
358 
359  for( async_map_t::iterator t_child_it = f_async_children.begin();
360  t_child_it != f_async_children.end();
361  ++t_child_it )
362  {
363  LDEBUG( dlog, "Opening channel for child <" << t_child_it->first << ">" );
364  t_child_it->second->channel() = open_channel();
365  t_child_it->second->set_listen_timeout_ms( f_listen_timeout_ms );
366  }
367  return true;
368  }
369 
371  {
372  LDEBUG( dlog, "Setting up queue for service <" << f_name << ">" );
373  if( ! setup_queue( f_channel, f_name ) ) return false;
374 
375  for( async_map_t::iterator t_child_it = f_async_children.begin();
376  t_child_it != f_async_children.end();
377  ++t_child_it )
378  {
379  LDEBUG( dlog, "Setting up queue for async child <" << t_child_it->first << ">" );
380  if( ! setup_queue( t_child_it->second->channel(), t_child_it->first ) ) return false;
381  }
382 
383  return true;
384  }
385 
387  {
388  LDEBUG( dlog, "Binding primary service keys" );
389  if( ! bind_key( f_channel, f_requests_exchange, f_name, f_name + ".#" ) ) return false;
390  if( ! bind_key( f_channel, f_requests_exchange, f_name, f_broadcast_key + ".#" ) ) return false;
391 
392  LDEBUG( dlog, "Binding keys for synchronous children" );
393  for( sync_map_t::const_iterator t_child_it = f_sync_children.begin();
394  t_child_it != f_sync_children.end();
395  ++t_child_it )
396  {
397  if( ! bind_key( f_channel, f_requests_exchange, f_name, t_child_it->first + ".#" ) ) return false;
398  }
399 
400  LDEBUG( dlog, "Binding keys for asynchronous children" );
401  for( async_map_t::iterator t_child_it = f_async_children.begin();
402  t_child_it != f_async_children.end();
403  ++t_child_it )
404  {
405  if( ! bind_key( t_child_it->second->channel(), f_requests_exchange, t_child_it->first, t_child_it->first + ".#" ) ) return false;
406  }
407 
408  return true;
409  }
410 
412  {
413  f_consumer_tag = core::start_consuming( f_channel, f_name );
414  if( f_consumer_tag.empty() ) return false;
415 
416  for( async_map_t::iterator t_child_it = f_async_children.begin();
417  t_child_it != f_async_children.end();
418  ++t_child_it )
419  {
420  t_child_it->second->consumer_tag() = core::start_consuming( t_child_it->second->channel(), t_child_it->first );
421  if( t_child_it->second->consumer_tag().empty() ) return false;
422  }
423  return true;
424  }
425 
427  {
428  // doesn't stop on failure; continues trying to stop consuming
429  bool t_success = true;
430  t_success = core::stop_consuming( f_channel, f_consumer_tag );
431  for( async_map_t::iterator t_child_it = f_async_children.begin();
432  t_child_it != f_async_children.end();
433  ++t_child_it )
434  {
435  t_success = core::stop_consuming( t_child_it->second->channel(), t_child_it->second->consumer_tag() );
436  }
437  return t_success;
438  }
439 
441  {
442  // doesn't stop on failure; continues trying to remove queues
443  bool t_success = true;
444  t_success = core::remove_queue( f_channel, f_name );
445  for( async_map_t::iterator t_child_it = f_async_children.begin();
446  t_child_it != f_async_children.end();
447  ++t_child_it )
448  {
449  t_success = core::remove_queue( t_child_it->second->channel(), t_child_it->first );
450  }
451  return t_success;
452  }
453 
455  {
456  LINFO( dlog, "Listening for incoming messages on <" << f_name << ">" );
457 
458  while( ! is_canceled() )
459  {
460  amqp_envelope_ptr t_envelope;
462  core::listen_for_message( t_envelope, t_post_listen_status, f_channel, f_consumer_tag, f_listen_timeout_ms );
463 
464  if( f_canceled.load() )
465  {
466  LDEBUG( dlog, "Service canceled" );
467  return true;
468  }
469 
470  if( t_post_listen_status == core::post_listen_status::timeout )
471  {
472  // we end up here every time the listen times out with no message received
473  continue;
474  }
475 
476  if( t_post_listen_status == core::post_listen_status::soft_error )
477  {
478  LWARN( dlog, "A soft error ocurred while listening for messages for <" << f_name << ">. The channel is still valid" );
479  continue;
480  }
481 
482  if( t_post_listen_status == core::post_listen_status::hard_error )
483  {
484  LERROR( dlog, "A hard error ocurred while listening for messages for <" << f_name << ">. The channel is no longer valid" );
485  return false;
486  }
487 
488  if( t_post_listen_status == core::post_listen_status::unknown )
489  {
490  LERROR( dlog, "An unknown status occurred while listening for messages for <" << f_name << ">" );
491  return false;
492  }
493 
494  // remaining status is core::post_listen_status::message_received
495 
496  f_status = status::processing;
497 
498  handle_message_chunk( t_envelope );
499 
500  if( f_canceled.load() )
501  {
502  LDEBUG( dlog, "Service <" << f_name << "> canceled" );
503  return true;
504  }
505 
506  f_status = status::listening;
507  }
508  return true;
509  }
510 
512  {
513  try
514  {
515  sort_message( a_message );
516  return;
517  }
518  catch( dripline_error& e )
519  {
520  LERROR( dlog, "<" << f_name << "> Dripline exception caught while handling message: " << e.what() );
521  throw;
522  }
523  catch( amqp_exception& e )
524  {
525  LERROR( dlog, "<" << f_name << "> AMQP exception caught while handling message: (" << e.reply_code() << ") " << e.reply_text() );
526  throw;
527  }
528  catch( amqp_lib_exception& e )
529  {
530  LERROR( dlog, "<" << f_name << "> AMQP Library Exception caught while handling message: (" << e.ErrorCode() << ") " << e.what() );
531  throw;
532  }
533  catch( std::exception& e )
534  {
535  LERROR( dlog, "<" << f_name << "> Standard exception caught while handling message: " << e.what() );
536  throw;
537  }
538 
539  return;
540  }
541 
542  void service::send_reply( reply_ptr_t a_reply ) const
543  {
544  LDEBUG( dlog, "Sending reply message to <" << a_reply->routing_key() << ">:\n" <<
545  " Return code: " << a_reply->get_return_code() << '\n' <<
546  " Return message: " << a_reply->return_message() << '\n' <<
547  " Payload:\n" << a_reply->payload() );
548 
549  if( ! send( a_reply ) )
550  {
551  LWARN( dlog, "Something went wrong while sending the reply" );
552  }
553  return;
554  }
555 
557  {
558  std::string t_first_token( a_request->routing_key() );
559  t_first_token = t_first_token.substr( 0, t_first_token.find_first_of('.') );
560  LDEBUG( dlog, "First token in routing key: <" << t_first_token << ">" );
561 
562  if( t_first_token == f_name || t_first_token == f_broadcast_key )
563  {
564  // reply will be sent by endpoint::on_request_message
565  return this->endpoint::on_request_message( a_request );
566  }
567  else
568  {
569  auto t_endpoint_itr = f_sync_children.find( t_first_token );
570  if( t_endpoint_itr == f_sync_children.end() )
571  {
572  LERROR( dlog, "Did not find child endpoint called <" << t_first_token << ">" );
573  throw dripline_error() << "Did not find child endpoint <" << t_first_token << ">";
574  }
575 
576  // reply will be sent by endpoint::on_request_message or derived
577  return t_endpoint_itr->second->on_request_message( a_request );
578  }
579  }
580 
581  void service::do_cancellation( int a_code )
582  {
583  LDEBUG( dlog, "Canceling service <" << f_name << ">" );
584  for( async_map_t::iterator t_child_it = f_async_children.begin();
585  t_child_it != f_async_children.end();
586  ++t_child_it )
587  {
588  LDEBUG( dlog, "Canceling child endpoint <" << t_child_it->first << ">" );
589  t_child_it->second->cancel( a_code );
590  }
591  return;
592  }
593 
594 } /* namespace dripline */
void execute()
Handles messages that appear in the concurrent queue by calling submit_message().
Definition: receiver.cc:429
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
Definition: core.hh:75
static bool bind_key(amqp_channel_ptr a_channel, const std::string &a_exchange, const std::string &a_queue_name, const std::string &a_routing_key)
Definition: core.cc:411
static bool setup_exchange(amqp_channel_ptr a_channel, const std::string &a_exchange)
Definition: core.cc:360
static void listen_for_message(amqp_envelope_ptr &a_envelope, post_listen_status &a_status, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
listen for a single AMQP message
Definition: core.cc:528
post_listen_status
Definition: core.hh:80
@ timeout
A timeout occurred, and the channel is still valid.
@ unknown
Initialized or unknown status.
@ soft_error
An error occurred, but the channel should still be valid.
@ hard_error
An error occurred, and the channel is no longer valid.
static std::string start_consuming(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Definition: core.cc:437
core & operator=(const core &a_orig)=default
static bool stop_consuming(amqp_channel_ptr a_channel, std::string &a_consumer_tag)
Definition: core.cc:462
amqp_channel_ptr open_channel() const
Definition: core.cc:277
static bool setup_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Definition: core.cc:385
static bool remove_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Definition: core.cc:498
Sets the default configuration used by core. These parameters pertain to the dripline mesh that will ...
Dripline-specific errors.
Decorator class for a plain endpoint: adds listener_receiver capabilities.
Definition: listener.hh:106
Basic Dripline object capable of receiving and acting on messages.
Definition: endpoint.hh:97
virtual reply_ptr_t on_request_message(const request_ptr_t a_request)
Default request handler; passes request to initial request functions.
Definition: endpoint.cc:65
void sort_message(const message_ptr_t a_request)
Definition: endpoint.cc:189
endpoint & operator=(const endpoint &a_orig)=default
A heartbeater repeatedly sends an alert on a particular time interval.
Definition: heartbeater.hh:53
heartbeater & operator=(const heartbeater &)=delete
void execute(const std::string &a_name, uuid_t a_id, const std::string &a_routing_key)
Definition: heartbeater.cc:44
std::thread f_heartbeat_thread
Definition: heartbeater.hh:83
Convenience class to bring together listener and concurrent_receiver.
Definition: listener.hh:77
listener_receiver & operator=(const listener_receiver &)=delete
A listener is a class capable of listening for AMQP messages on an AMQP channel. This class provides ...
Definition: listener.hh:48
void handle_message_chunk(amqp_envelope_ptr a_envelope)
Definition: receiver.cc:62
Executes scheduled events.
Definition: scheduler.hh:104
void execute()
Main execution loop for the scheduler.
Definition: scheduler.hh:340
scheduler & operator=(const scheduler &)=delete
std::thread f_scheduler_thread
Definition: scheduler.hh:176
Primary unit of software that connects to a broker and typically provides an interface with an instru...
Definition: service.hh:85
virtual reply_ptr_t on_request_message(const request_ptr_t a_request)
Default request handler; passes request to initial request functions.
Definition: service.cc:556
virtual void run()
Definition: service.cc:142
virtual void send_reply(reply_ptr_t a_reply) const
Sends a reply message.
Definition: service.cc:542
virtual ~service()
Definition: service.cc:77
bool add_async_child(endpoint_ptr_t a_endpoint_ptr)
Add an asynchronous child endpoint.
Definition: service.cc:122
virtual sent_msg_pkg_ptr send(request_ptr_t a_request, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
Sends a request message and returns a channel on which to listen for a reply.
Definition: service.hh:227
virtual bool open_channels()
Definition: service.cc:353
virtual bool setup_queues()
Definition: service.cc:370
virtual bool stop_consuming()
Definition: service.cc:426
virtual bool remove_queue()
Definition: service.cc:440
bool add_child(endpoint_ptr_t a_endpoint_ptr)
Add a synchronous child endpoint.
Definition: service.cc:107
service(const scarab::param_node &a_config=service_config(), const scarab::authentication &a_auth=create_auth_with_dripline(true), const bool a_make_connection=true)
Definition: service.cc:31
service & operator=(const service &)=delete
virtual void submit_message(message_ptr_t a_message)
Implementation of submit_message (from concurrent_receiver)
Definition: service.cc:511
virtual bool start_consuming()
Definition: service.cc:411
virtual bool bind_keys()
Definition: service.cc:386
virtual bool listen_on_queue()
Definition: service.cc:454
virtual void do_cancellation(int a_code)
Definition: service.cc:581
AmqpClient::AmqpLibraryException amqp_lib_exception
Definition: amqp.hh:29
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
Definition: uuid.cc:19
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
std::shared_ptr< msg_request > request_ptr_t
Definition: dripline_fwd.hh:23
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
Definition: amqp.hh:25
AmqpClient::AmqpException amqp_exception
Definition: amqp.hh:28
std::shared_ptr< endpoint > endpoint_ptr_t
Definition: dripline_fwd.hh:39
std::shared_ptr< listener_receiver > lr_ptr_t
Definition: dripline_fwd.hh:33
Definition: agent.hh:18