8 #define DRIPLINE_API_EXPORTS
15 #include "authentication.hh"
16 #include "exponential_backoff.hh"
18 #include "param_codec.hh"
19 #include "signal_handler.hh"
35 LDEBUG(
dlog,
"Stopping consuming messages" );
40 LERROR(
dlog,
"AMQP exception caught while canceling the channel: (" << e.reply_code() <<
") " << e.reply_text() );
44 LERROR(
dlog,
"AMQP library exception caught while canceling the channel: (" << e.ErrorCode() <<
") " << e.what() );
51 core::core(
const scarab::param_node& a_config,
const scarab::authentication& a_auth,
const bool a_make_connection ) :
56 f_requests_exchange(),
58 f_heartbeat_routing_key(),
61 f_max_connection_attempts()
67 t_config.merge( a_config );
68 LDEBUG(
dlog,
"Dripline core being configured with:\n" << t_config );
87 f_address = t_config[
"broker"]().as_string();
88 f_port = t_config[
"broker_port"]().as_uint();
89 f_requests_exchange = t_config[
"requests_exchange"]().as_string();
90 f_alerts_exchange = t_config[
"alerts_exchange"]().as_string();
91 f_heartbeat_routing_key = t_config[
"heartbeat_routing_key"]().as_string();
92 f_make_connection = t_config.get_value(
"make_connection", a_make_connection );
93 f_max_payload_size = t_config[
"max_payload_size"]().as_uint();
94 f_max_connection_attempts = t_config[
"max_connection_attempts"]().as_uint();
96 f_username = a_auth.get(
"dripline",
"username",
"guest");
97 f_password = a_auth.get(
"dripline",
"password",
"guest");
100 if( t_config.has(
"return_codes" ) )
103 auto t_extract_codes = [](
const scarab::param_array& a_codes)
105 LDEBUG(
dlog,
"Adding return codes:\n" << a_codes );
106 for(
auto t_code_it = a_codes.begin(); t_code_it != a_codes.end(); ++t_code_it )
110 const scarab::param_node& t_a_node = t_code_it->as_node();
111 if(
check_and_add_return_code( t_a_node[
"value"]().as_uint(), t_a_node[
"name"]().as_string(), t_a_node[
"description"]().as_string() ) )
113 LDEBUG(
dlog,
"Added return code <" << t_a_node[
"name"]().as_string() <<
" (" << t_a_node[
"value"]().as_uint() <<
")>: " << t_a_node[
"description"]().as_string() );
116 catch(
const scarab::error& e )
118 throw dripline_error() <<
"Invalid configuration for a return code:\n" << *t_code_it <<
'\n' << e.what();
120 catch(
const std::out_of_range& e )
122 throw dripline_error() <<
"Missing configuration parameter for a return code:\n" << *t_code_it;
128 if( t_config[
"return_codes"].is_value() && t_config[
"return_codes"]().is_string() )
131 std::string t_filename( t_config[
"return_codes"]().as_string() );
132 scarab::param_translator t_translator;
133 scarab::param_ptr_t t_ret_codes = t_translator.read_file( t_filename );
134 if( ! t_ret_codes || ! t_ret_codes->is_array() )
136 throw dripline_error() <<
"Could not find or open return-code config file, or the config does not contain an array: " << t_filename;
138 t_extract_codes( t_ret_codes->as_array() );
140 else if( t_config[
"return_codes"].is_array() )
143 t_extract_codes( t_config[
"return_codes"].as_array() );
147 throw dripline_error() <<
"Return code configuration is invalid:\n" << t_config[
"return_codes"];
154 LDEBUG(
dlog,
"Sending request with routing key <" << a_request->routing_key() <<
">" );
160 return do_send( std::static_pointer_cast< message >( a_request ), f_requests_exchange,
true, a_channel );
165 LDEBUG(
dlog,
"Sending reply with routing key <" << a_reply->routing_key() <<
">" );
171 return do_send( std::static_pointer_cast< message >( a_reply ), f_requests_exchange,
false, a_channel );
176 LDEBUG(
dlog,
"Sending alert with routing key <" << a_alert->routing_key() <<
">" );
182 return do_send( std::static_pointer_cast< message >( a_alert ), f_alerts_exchange,
false, a_channel );
194 auto t_diagnostic_string_maker = [a_message,
this]() -> std::string {
195 return std::string(
"Broker: ") + f_address +
"\nPort: " +
std::to_string(f_port) +
"\nRouting Key: " + a_message->routing_key();
201 throw connection_error() <<
"Unable to open channel to send message\n" << t_diagnostic_string_maker();
206 throw dripline_error() <<
"Unable to setup the exchange <" << a_exchange <<
"> to send message\n" << t_diagnostic_string_maker();
211 std::unique_lock< std::mutex > t_rr_lock( t_receive_reply->f_mutex );
215 t_receive_reply->f_channel = t_channel;
218 std::string t_reply_to = t_channel->DeclareQueue(
"" );
219 t_channel->BindQueue( t_reply_to, a_exchange, t_reply_to );
221 a_message->reply_to() = t_reply_to;
224 t_receive_reply->f_consumer_tag = t_channel->BasicConsume( t_reply_to );
225 LDEBUG(
dlog,
"Reply-to for request: " << t_reply_to );
226 LDEBUG(
dlog,
"Consumer tag for reply: " << t_receive_reply->f_consumer_tag );
231 if( t_amqp_messages.empty() )
233 throw dripline_error() <<
"Unable to convert the dripline::message object to AMQP message(s) to be sent\n" << t_diagnostic_string_maker();
238 LDEBUG(
dlog,
"Sending message to <" << a_message->routing_key() <<
">" );
244 t_channel->BasicPublish( a_exchange, a_message->routing_key(), t_amqp_message, a_message->is_request(),
false );
246 LDEBUG(
dlog,
"Message sent in " << t_amqp_messages.size() <<
" chunks" );
247 t_receive_reply->f_successful_send =
true;
248 t_receive_reply->f_send_error_message.clear();
250 catch( AmqpClient::ConnectionClosedException& e )
252 LERROR(
dlog,
"Unable to send message because the connection is closed: " << e.what() );
253 throw connection_error() <<
"Unable to send message because the connection is closed: " << e.what() <<
'\n' << t_diagnostic_string_maker();
255 catch( AmqpClient::AmqpLibraryException& e )
257 LERROR(
dlog,
"AMQP error while sending message: " << e.what() );
258 t_receive_reply->f_successful_send =
false;
259 t_receive_reply->f_send_error_message = std::string(
"AMQP error while sending message: ") + std::string(e.what()) +
'\n' + t_diagnostic_string_maker();
261 catch( AmqpClient::MessageReturnedException& e )
263 LERROR(
dlog,
"Message was returned: " << e.what() );
264 t_receive_reply->f_successful_send =
false;
265 t_receive_reply->f_send_error_message = std::string(
"Message was returned: ") + std::string(e.what()) +
'\n' + t_diagnostic_string_maker();
267 catch( std::exception& e )
269 LERROR(
dlog,
"Error while sending message: " << e.what() );
270 t_receive_reply->f_successful_send =
false;
271 t_receive_reply->f_send_error_message = std::string(
"Error while sending message: ") + std::string(e.what()) +
'\n' + t_diagnostic_string_maker();
274 return t_receive_reply;
295 auto t_open_conn_fcn = [&]()->
bool
299 LINFO(
dlog,
"Opening AMQP connection and creating channel to " << f_address <<
":" << f_port );
300 LDEBUG(
dlog,
"Using broker authentication: " << f_username <<
":" << f_password );
301 struct AmqpClient::Channel::OpenOpts opts;
302 opts.host = f_address;
304 opts.auth = AmqpClient::Channel::OpenOpts::BasicAuth(f_username, f_password);
305 t_ret_ptr = AmqpClient::Channel::Open( opts );
310 if( e.is_soft_error() )
312 LWARN(
dlog,
"Recoverable AMQP exception caught while opening channel: (" << e.reply_code() <<
") " << e.reply_text() );
320 LERROR(
dlog,
"AMQP Library Exception caught while creating channel: (" << e.ErrorCode() <<
") " << e.what() );
321 if( e.ErrorCode() == -9 )
323 LERROR(
dlog,
"This error means the client could not connect to the broker.\n" <<
324 "Check that you have the address and port correct, and that the broker is running.")
331 scarab::exponential_backoff<> t_open_conn_backoff( t_open_conn_fcn, f_max_connection_attempts );
332 auto t_exp_cancel_wrap = wrap_cancelable( t_open_conn_backoff );
333 scarab::signal_handler::add_cancelable( t_exp_cancel_wrap );
335 int t_expback_return = 0;
338 LDEBUG(
dlog,
"Attempting to open channel; will make up to " << f_max_connection_attempts <<
" attempts" );
339 t_expback_return = t_open_conn_backoff.go();
344 LERROR(
dlog,
"Unrecoverable AMQP exception caught while opening channel: (" << e.reply_code() <<
") " << e.reply_text() );
346 catch(
const std::exception& e)
349 LERROR(
dlog,
"Standard exception caught while creating channel: " << e.what() );
352 if( t_expback_return == 0 )
354 LERROR(
dlog,
"Failed to open a channel; no more attempts will be made" );
369 LDEBUG(
dlog,
"Declaring exchange <" << a_exchange <<
">" );
370 a_channel->DeclareExchange( a_exchange, AmqpClient::Channel::EXCHANGE_TYPE_TOPIC,
false,
false,
false );
375 LERROR(
dlog,
"AMQP exception caught while declaring exchange: (" << e.reply_code() <<
") " << e.reply_text() );
380 LERROR(
dlog,
"AMQP library exception caught while declaring exchange: (" << e.ErrorCode() <<
") " << e.what() );
394 LDEBUG(
dlog,
"Declaring queue <" << a_queue_name <<
">" );
395 a_channel->DeclareQueue( a_queue_name,
false,
false,
true,
true );
400 LERROR(
dlog,
"AMQP exception caught while declaring queue: (" << e.reply_code() <<
") " << e.reply_text() );
405 LERROR(
dlog,
"AMQP library exception caught while declaring queue: (" << e.ErrorCode() <<
") " << e.what() );
420 LDEBUG(
dlog,
"Binding key <" << a_routing_key <<
"> to queue <" << a_queue_name <<
"> over exchange <" << a_exchange <<
">" );
421 a_channel->BindQueue( a_queue_name, a_exchange, a_routing_key );
427 LERROR(
dlog,
"AMQP exception caught while declaring binding key <" << a_routing_key <<
">: (" << e.reply_code() <<
") " << e.reply_text() );
432 LERROR(
dlog,
"AMQP library exception caught while binding key <" << a_routing_key <<
">: (" << e.ErrorCode() <<
") " << e.what() );
441 return std::string();
446 LDEBUG(
dlog,
"Starting to consume messages on queue <" << a_queue_name <<
">" );
448 return a_channel->BasicConsume( a_queue_name,
"",
true,
false );
452 LERROR(
dlog,
"AMQP exception caught while starting consuming messages on <" << a_queue_name <<
">: (" << e.reply_code() <<
") " << e.reply_text() );
453 return std::string();
457 LERROR(
dlog,
"AMQP library exception caught while starting consuming messages on <" << a_queue_name <<
">: (" << e.ErrorCode() <<
") " << e.what() );
458 return std::string();
471 LDEBUG(
dlog,
"Stopping consuming messages for consumer <" << a_consumer_tag <<
">" );
472 a_channel->BasicCancel( a_consumer_tag );
473 a_consumer_tag.clear();
478 LERROR(
dlog,
"AMQP exception caught while stopping consuming messages on <" << a_consumer_tag <<
">: (" << e.reply_code() <<
") " << e.reply_text() );
483 LERROR(
dlog,
"AMQP library exception caught while stopping consuming messages on <" << a_consumer_tag <<
">: (" << e.ErrorCode() <<
") " << e.what() );
486 catch( AmqpClient::ConsumerTagNotFoundException& e )
488 LERROR(
dlog,
"Fatal AMQP exception encountered while stopping consuming messages on <" << a_consumer_tag <<
">: " << e.what() );
491 catch( std::exception& e )
493 LERROR(
dlog,
"Standard exception caught while stopping consuming messages on <" << a_consumer_tag <<
">: " << e.what() );
507 LDEBUG(
dlog,
"Deleting queue <" << a_queue_name <<
">" );
508 a_channel->DeleteQueue( a_queue_name,
false );
511 catch( AmqpClient::ConnectionClosedException& e )
513 LERROR(
dlog,
"Fatal AMQP exception encountered removing queue <" << a_queue_name <<
">: " << e.what() );
518 LERROR(
dlog,
"AMQP library exception caught while removing queue <" << a_queue_name <<
">: (" << e.ErrorCode() <<
") " << e.what() );
521 catch( std::exception& e )
523 LERROR(
dlog,
"Standard exception caught while removing queue <" << a_queue_name <<
">: " << e.what() );
540 if( a_timeout_ms > 0 )
542 a_channel->BasicConsumeMessage( a_consumer_tag, a_envelope, a_timeout_ms );
546 a_envelope = a_channel->BasicConsumeMessage( a_consumer_tag );
550 if( a_do_ack ) a_channel->BasicAck( a_envelope );
559 catch( AmqpClient::ConnectionClosedException& e )
561 LERROR(
dlog,
"Fatal AMQP exception encountered: " << e.what() );
565 catch( AmqpClient::ConsumerCancelledException& e )
567 LERROR(
dlog,
"Fatal AMQP exception encountered: " << e.what() );
571 catch( AmqpClient::AmqpException& e )
573 if( e.is_soft_error() )
575 LWARN(
dlog,
"Non-fatal AMQP exception encountered: " << e.reply_text() );
579 LERROR(
dlog,
"Fatal AMQP exception encountered: " << e.reply_text() );
583 catch( std::exception& e )
585 LERROR(
dlog,
"Standard exception caught: " << e.what() );
591 LERROR(
dlog,
"Unknown exception caught" );
Error indicating a problem with the connection to the broker.
static bool bind_key(amqp_channel_ptr a_channel, const std::string &a_exchange, const std::string &a_queue_name, const std::string &a_routing_key)
sent_msg_pkg_ptr do_send(message_ptr_t a_message, const std::string &a_exchange, bool a_expect_reply, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
static bool setup_exchange(amqp_channel_ptr a_channel, const std::string &a_exchange)
virtual sent_msg_pkg_ptr send(request_ptr_t a_request, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
static void listen_for_message(amqp_envelope_ptr &a_envelope, post_listen_status &a_status, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
listen for a single AMQP message
@ message_received
A message was received, and the channel is still valid.
@ timeout
A timeout occurred, and the channel is still valid.
@ unknown
Initialized or unknown status.
@ soft_error
An error occurred, but the channel should still be valid.
@ hard_error
An error occurred, and the channel is no longer valid.
static std::string start_consuming(amqp_channel_ptr a_channel, const std::string &a_queue_name)
core(const scarab::param_node &a_config=dripline_config(), const scarab::authentication &a_auth=scarab::authentication(), const bool a_make_connection=true)
static bool stop_consuming(amqp_channel_ptr a_channel, std::string &a_consumer_tag)
amqp_channel_ptr open_channel() const
static bool setup_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
static bool remove_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Sets the default configuration used by core. These parameters pertain to the dripline mesh that will ...
Dripline-specific errors.
bool check_and_add_return_code(unsigned a_value, const std::string &a_name, const std::string &a_description)
AmqpClient::Channel::ptr_t amqp_channel_ptr
AmqpClient::AmqpLibraryException amqp_lib_exception
std::vector< amqp_message_ptr > amqp_split_message_ptrs
std::shared_ptr< msg_alert > alert_ptr_t
std::string to_string(op_t an_op)
Gives the human-readable version of a message operation.
std::shared_ptr< message > message_ptr_t
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
std::shared_ptr< msg_reply > reply_ptr_t
AmqpClient::BasicMessage::ptr_t amqp_message_ptr
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
std::shared_ptr< msg_request > request_ptr_t
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
AmqpClient::AmqpException amqp_exception
amqp_channel_ptr f_channel
std::string f_consumer_tag