8 #define DRIPLINE_API_EXPORTS
16 #include "authentication.hh"
19 using scarab::authentication;
20 using scarab::param_node;
21 using scarab::param_value;
22 using scarab::param_ptr_t;
31 service::service(
const scarab::param_node& a_config,
const scarab::authentication& a_auth,
const bool a_make_connection ) :
33 core( a_config.has(
"dripline_mesh") ? a_config[
"dripline_mesh"].as_node() :
dripline_config(),
34 a_auth, a_make_connection ),
35 endpoint( a_config.get_value(
"name",
"dlcpp_service" ) ),
40 f_status(
status::nothing ),
41 f_restart_on_error( a_config.get_value(
"restart_on_error", true ) ),
42 f_enable_scheduling( a_config.get_value(
"enable_scheduling", false ) ),
46 f_broadcast_key( a_config.get_value(
"broadcast_key",
"broadcast" ) )
48 LDEBUG(
dlog,
"Service (cpp) created with config:\n" << a_config );
51 f_listen_timeout_ms = a_config.get_value(
"loop_timeout_ms", f_listen_timeout_ms );
52 heartbeater::f_check_timeout_ms = f_listen_timeout_ms;
54 f_single_message_wait_ms = a_config.get_value(
"message_wait_ms", f_single_message_wait_ms );
56 f_heartbeat_interval_s = a_config.get_value(
"heartbeat_interval_s", f_heartbeat_interval_s );
82 std::this_thread::sleep_for( std::chrono::milliseconds(1100) );
89 cancelable::operator=( std::move(a_orig) );
96 f_status = std::move( a_orig.f_status );
97 f_restart_on_error = a_orig.f_restart_on_error;
98 f_enable_scheduling = a_orig.f_enable_scheduling;
99 f_id = std::move( a_orig.f_id );
100 f_sync_children = std::move( a_orig.f_sync_children );
101 f_async_children = std::move( a_orig.f_async_children );
102 f_broadcast_key = std::move( a_orig.f_broadcast_key );
109 auto t_inserted = f_sync_children.insert( std::make_pair( a_endpoint_ptr->name(), a_endpoint_ptr ) );
110 if( t_inserted.second )
112 a_endpoint_ptr->set_service(
this );
116 LERROR(
dlog,
"Endpoint <" << a_endpoint_ptr->name() <<
" could not be added to service <" << f_name <<
">" );
119 return t_inserted.second;
124 lr_ptr_t t_listener_receiver_ptr = std::dynamic_pointer_cast< listener_receiver >( a_endpoint_ptr );
125 if( ! t_listener_receiver_ptr )
129 auto t_inserted = f_async_children.insert( std::make_pair( a_endpoint_ptr->name(), t_listener_receiver_ptr ) );
130 if( t_inserted.second )
132 a_endpoint_ptr->set_service(
this );
136 LERROR(
dlog,
"Endpoint (async) <" << a_endpoint_ptr->name() <<
" could not be added to service <" << f_name <<
">" );
139 return t_inserted.second;
144 unsigned n_failures = 0;
145 bool t_do_repeat =
true;
150 LINFO(
dlog,
"Starting the service" );
151 if( !
start() )
throw dripline_error() <<
"There was a problem while starting the service (check for prior error messages)";
154 LINFO(
dlog,
"Service started; now listening for messages" );
155 if( !
listen() )
throw dripline_error() <<
"There was a problem while listening for messages (check for prior error messages)";
165 if( f_restart_on_error && n_failures < 2 )
169 LWARN(
dlog, e.what() );
170 LWARN(
dlog,
"Will attempt to reconnect" );
175 LERROR(
dlog,
"Reached maximum number of reconnect attempts" );
186 LINFO(
dlog,
"Stopping the service" );
187 if( !
stop() )
throw dripline_error() <<
"There was a problem while stopping the service (check for prior error messages)";
194 if( ! f_make_connection )
196 LWARN(
dlog,
"Should not start service when make_connection is disabled" );
201 LERROR(
dlog,
"Service requires a queue name to be started" );
206 endpoint::f_service =
this;
207 heartbeater::f_service =
this;
209 LINFO(
dlog,
"Connecting to <" << f_address <<
":" << f_port <<
">" );
214 if( !
setup_exchange( f_channel, f_requests_exchange ) )
return false;
215 if( !
setup_exchange( f_channel, f_alerts_exchange ) )
return false;
232 if ( ! f_make_connection )
234 LWARN(
dlog,
"Should not listen for messages when make_connection is disabled" );
242 if( f_heartbeat_interval_s != 0 )
244 LINFO(
dlog,
"Starting heartbeat" );
249 LINFO(
dlog,
"Heartbeat disabled" );
252 if( f_enable_scheduling )
254 LINFO(
dlog,
"Starting scheduler" );
259 LINFO(
dlog,
"Scheduler disabled" );
262 LINFO(
dlog,
"Starting receiver thread" );
266 bool t_listen_error =
false;
267 auto t_cancel_on_listen_error = [&t_listen_error,
this](
listener& a_listener) {
268 if( ! a_listener.listen_on_queue() )
270 t_listen_error =
true;
271 this->cancel( RETURN_ERROR );
275 if( ! f_async_children.empty() ) { LINFO(
dlog,
"Starting async children" ); }
276 else { LDEBUG(
dlog,
"No async children to start" ); }
277 for( async_map_t::iterator t_child_it = f_async_children.begin();
278 t_child_it != f_async_children.end();
282 t_child_it->second->listener_thread() = std::thread( t_cancel_on_listen_error, std::ref(*t_child_it->second.get()) );
285 LINFO(
dlog,
"Starting listener thread" );
286 t_cancel_on_listen_error( *
this );
288 for( async_map_t::iterator t_child_it = f_async_children.begin();
289 t_child_it != f_async_children.end();
292 t_child_it->second->listener_thread().join();
293 t_child_it->second->receiver_thread().join();
296 f_receiver_thread.join();
307 if( t_listen_error)
throw dripline_error() <<
"Something went wrong while listening for messages";
309 catch( std::system_error& e )
311 LERROR(
dlog,
"Could not start the a thread due to a system error: " << e.what() );
316 LERROR(
dlog,
"Dripline error while running service: " << e.what() );
319 catch( std::exception& e )
321 LERROR(
dlog,
"Error while running service: " << e.what() );
330 LINFO(
dlog,
"Stopping service on <" << f_name <<
">" );
355 LDEBUG(
dlog,
"Opening channel for service <" << f_name <<
">" );
357 if( ! f_channel )
return false;
359 for( async_map_t::iterator t_child_it = f_async_children.begin();
360 t_child_it != f_async_children.end();
363 LDEBUG(
dlog,
"Opening channel for child <" << t_child_it->first <<
">" );
365 t_child_it->second->set_listen_timeout_ms( f_listen_timeout_ms );
372 LDEBUG(
dlog,
"Setting up queue for service <" << f_name <<
">" );
373 if( !
setup_queue( f_channel, f_name ) )
return false;
375 for( async_map_t::iterator t_child_it = f_async_children.begin();
376 t_child_it != f_async_children.end();
379 LDEBUG(
dlog,
"Setting up queue for async child <" << t_child_it->first <<
">" );
380 if( !
setup_queue( t_child_it->second->channel(), t_child_it->first ) )
return false;
388 LDEBUG(
dlog,
"Binding primary service keys" );
389 if( !
bind_key( f_channel, f_requests_exchange, f_name, f_name +
".#" ) )
return false;
390 if( !
bind_key( f_channel, f_requests_exchange, f_name, f_broadcast_key +
".#" ) )
return false;
392 LDEBUG(
dlog,
"Binding keys for synchronous children" );
393 for( sync_map_t::const_iterator t_child_it = f_sync_children.begin();
394 t_child_it != f_sync_children.end();
397 if( !
bind_key( f_channel, f_requests_exchange, f_name, t_child_it->first +
".#" ) )
return false;
400 LDEBUG(
dlog,
"Binding keys for asynchronous children" );
401 for( async_map_t::iterator t_child_it = f_async_children.begin();
402 t_child_it != f_async_children.end();
405 if( !
bind_key( t_child_it->second->channel(), f_requests_exchange, t_child_it->first, t_child_it->first +
".#" ) )
return false;
414 if( f_consumer_tag.empty() )
return false;
416 for( async_map_t::iterator t_child_it = f_async_children.begin();
417 t_child_it != f_async_children.end();
420 t_child_it->second->consumer_tag() =
core::start_consuming( t_child_it->second->channel(), t_child_it->first );
421 if( t_child_it->second->consumer_tag().empty() )
return false;
429 bool t_success =
true;
431 for( async_map_t::iterator t_child_it = f_async_children.begin();
432 t_child_it != f_async_children.end();
435 t_success =
core::stop_consuming( t_child_it->second->channel(), t_child_it->second->consumer_tag() );
443 bool t_success =
true;
445 for( async_map_t::iterator t_child_it = f_async_children.begin();
446 t_child_it != f_async_children.end();
456 LINFO(
dlog,
"Listening for incoming messages on <" << f_name <<
">" );
458 while( ! is_canceled() )
464 if( f_canceled.load() )
466 LDEBUG(
dlog,
"Service canceled" );
478 LWARN(
dlog,
"A soft error ocurred while listening for messages for <" << f_name <<
">. The channel is still valid" );
484 LERROR(
dlog,
"A hard error ocurred while listening for messages for <" << f_name <<
">. The channel is no longer valid" );
490 LERROR(
dlog,
"An unknown status occurred while listening for messages for <" << f_name <<
">" );
500 if( f_canceled.load() )
502 LDEBUG(
dlog,
"Service <" << f_name <<
"> canceled" );
520 LERROR(
dlog,
"<" << f_name <<
"> Dripline exception caught while handling message: " << e.what() );
525 LERROR(
dlog,
"<" << f_name <<
"> AMQP exception caught while handling message: (" << e.reply_code() <<
") " << e.reply_text() );
530 LERROR(
dlog,
"<" << f_name <<
"> AMQP Library Exception caught while handling message: (" << e.ErrorCode() <<
") " << e.what() );
533 catch( std::exception& e )
535 LERROR(
dlog,
"<" << f_name <<
"> Standard exception caught while handling message: " << e.what() );
544 LDEBUG(
dlog,
"Sending reply message to <" << a_reply->routing_key() <<
">:\n" <<
545 " Return code: " << a_reply->get_return_code() <<
'\n' <<
546 " Return message: " << a_reply->return_message() <<
'\n' <<
547 " Payload:\n" << a_reply->payload() );
549 if( !
send( a_reply ) )
551 LWARN(
dlog,
"Something went wrong while sending the reply" );
558 std::string t_first_token( a_request->routing_key() );
559 t_first_token = t_first_token.substr( 0, t_first_token.find_first_of(
'.') );
560 LDEBUG(
dlog,
"First token in routing key: <" << t_first_token <<
">" );
562 if( t_first_token == f_name || t_first_token == f_broadcast_key )
569 auto t_endpoint_itr = f_sync_children.find( t_first_token );
570 if( t_endpoint_itr == f_sync_children.end() )
572 LERROR(
dlog,
"Did not find child endpoint called <" << t_first_token <<
">" );
573 throw dripline_error() <<
"Did not find child endpoint <" << t_first_token <<
">";
577 return t_endpoint_itr->second->on_request_message( a_request );
583 LDEBUG(
dlog,
"Canceling service <" << f_name <<
">" );
584 for( async_map_t::iterator t_child_it = f_async_children.begin();
585 t_child_it != f_async_children.end();
588 LDEBUG(
dlog,
"Canceling child endpoint <" << t_child_it->first <<
">" );
589 t_child_it->second->cancel( a_code );
void execute()
Handles messages that appear in the concurrent queue by calling submit_message().
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
static bool bind_key(amqp_channel_ptr a_channel, const std::string &a_exchange, const std::string &a_queue_name, const std::string &a_routing_key)
static bool setup_exchange(amqp_channel_ptr a_channel, const std::string &a_exchange)
static void listen_for_message(amqp_envelope_ptr &a_envelope, post_listen_status &a_status, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
listen for a single AMQP message
@ timeout
A timeout occurred, and the channel is still valid.
@ unknown
Initialized or unknown status.
@ soft_error
An error occurred, but the channel should still be valid.
@ hard_error
An error occurred, and the channel is no longer valid.
static std::string start_consuming(amqp_channel_ptr a_channel, const std::string &a_queue_name)
core & operator=(const core &a_orig)=default
static bool stop_consuming(amqp_channel_ptr a_channel, std::string &a_consumer_tag)
amqp_channel_ptr open_channel() const
static bool setup_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
static bool remove_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Sets the default configuration used by core. These parameters pertain to the dripline mesh that will ...
Dripline-specific errors.
Decorator class for a plain endpoint: adds listener_receiver capabilities.
Basic Dripline object capable of receiving and acting on messages.
virtual reply_ptr_t on_request_message(const request_ptr_t a_request)
Default request handler; passes request to initial request functions.
void sort_message(const message_ptr_t a_request)
endpoint & operator=(const endpoint &a_orig)=default
A heartbeater repeatedly sends an alert on a particular time interval.
heartbeater & operator=(const heartbeater &)=delete
void execute(const std::string &a_name, uuid_t a_id, const std::string &a_routing_key)
std::thread f_heartbeat_thread
Convenience class to bring together listener and concurrent_receiver.
listener_receiver & operator=(const listener_receiver &)=delete
A listener is a class capable of listening for AMQP messages on an AMQP channel. This class provides ...
void handle_message_chunk(amqp_envelope_ptr a_envelope)
Executes scheduled events.
void execute()
Main execution loop for the scheduler.
scheduler & operator=(const scheduler &)=delete
std::thread f_scheduler_thread
Primary unit of software that connects to a broker and typically provides an interface with an instru...
virtual reply_ptr_t on_request_message(const request_ptr_t a_request)
Default request handler; passes request to initial request functions.
virtual void send_reply(reply_ptr_t a_reply) const
Sends a reply message.
bool add_async_child(endpoint_ptr_t a_endpoint_ptr)
Add an asynchronous child endpoint.
virtual sent_msg_pkg_ptr send(request_ptr_t a_request, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
Sends a request message and returns a channel on which to listen for a reply.
virtual bool open_channels()
virtual bool setup_queues()
virtual bool stop_consuming()
virtual bool remove_queue()
bool add_child(endpoint_ptr_t a_endpoint_ptr)
Add a synchronous child endpoint.
service(const scarab::param_node &a_config=service_config(), const scarab::authentication &a_auth=create_auth_with_dripline(true), const bool a_make_connection=true)
service & operator=(const service &)=delete
virtual void submit_message(message_ptr_t a_message)
Implementation of submit_message (from concurrent_receiver)
virtual bool start_consuming()
virtual bool listen_on_queue()
virtual void do_cancellation(int a_code)
AmqpClient::AmqpLibraryException amqp_lib_exception
std::shared_ptr< message > message_ptr_t
std::shared_ptr< msg_reply > reply_ptr_t
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
std::shared_ptr< msg_request > request_ptr_t
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
AmqpClient::AmqpException amqp_exception
std::shared_ptr< endpoint > endpoint_ptr_t
std::shared_ptr< listener_receiver > lr_ptr_t