1 #define DRIPLINE_API_EXPORTS
16 relayer::relayer(
const scarab::param_node& a_config,
const scarab::authentication& a_auth ) :
17 core( a_config[
"dripline_mesh"].as_node(), a_auth ),
26 cancelable::operator=( std::move(a_orig) );
28 f_queue = std::move( a_orig.f_queue );
29 f_msg_receiver = std::move( a_orig.f_msg_receiver );
36 LDEBUG(
dlog,
"Dripline relayer starting" );
37 while( ! is_canceled() )
40 bool t_have_message =
f_queue.timed_wait_and_pop( t_mar );
41 if( ! t_have_message )
continue;
45 std::unique_lock< std::mutex > lock( t_mar->f_wait_for_send_pkg->f_mutex );
46 switch( t_mar->f_message->message_type() )
49 t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr =
core::send( std::static_pointer_cast< dripline::msg_request >( t_mar->f_message ) );
52 t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr =
core::send( std::static_pointer_cast< dripline::msg_alert >( t_mar->f_message ) );
55 throw dripline_error() <<
"Unsupported message type: " << t_mar->f_message->message_type();
58 if( ! t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr->f_successful_send )
60 LERROR(
dlog,
"Message sending failed: " << t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr->f_send_error_message <<
'\n' << *t_mar->f_message );
62 t_mar->f_wait_for_send_pkg->f_condition_var.notify_one();
67 LWARN(
dlog,
"Operating in offline mode; message not sent" );
71 LERROR(
dlog,
"Unable to connect to the broker:\n" << e.what() <<
'\n' << *t_mar->f_message );
75 LERROR(
dlog,
"Dripline error while sending reply:\n" << e.what() <<
'\n' << *t_mar->f_message );
80 LDEBUG(
dlog,
"Exiting the Dripline relayer" );
88 LDEBUG(
dlog,
"Canceling relayer" );
97 LWARN(
dlog,
"Relayer has been canceled; request not sent" );
99 t_return->f_sent_msg_pkg_ptr = std::make_shared< sent_msg_pkg >();
100 t_return->f_sent_msg_pkg_ptr->f_successful_send =
false;
103 LDEBUG(
dlog,
"Sending request to <" << a_request->routing_key() <<
">" );
104 mar_ptr t_mar = std::make_shared< message_and_reply >();
105 std::unique_lock< std::mutex > lock( t_mar->f_wait_for_send_pkg->f_mutex );
106 t_mar->f_message = std::static_pointer_cast< dripline::message >( a_request );
107 t_mar->f_wait_for_send_pkg = std::make_shared< wait_for_send_pkg >();
109 return t_mar->f_wait_for_send_pkg;
116 LWARN(
dlog,
"Relayer has been canceled; request not sent" );
118 t_return->f_sent_msg_pkg_ptr = std::make_shared< sent_msg_pkg >();
119 t_return->f_sent_msg_pkg_ptr->f_successful_send =
false;
122 LDEBUG(
dlog,
"Sending alert to <" << a_alert->routing_key() <<
">" );
123 mar_ptr t_mar = std::make_shared< message_and_reply >();
124 std::unique_lock< std::mutex > lock( t_mar->f_wait_for_send_pkg->f_mutex );
125 t_mar->f_message = std::static_pointer_cast< dripline::message >( a_alert );
126 t_mar->f_wait_for_send_pkg = std::make_shared< wait_for_send_pkg >();
128 return t_mar->f_wait_for_send_pkg;
139 std::unique_lock< std::mutex > t_lock( a_receive_reply->f_mutex );
140 auto t_deadline = std::chrono::system_clock::now() + std::chrono::milliseconds( a_timeout_ms );
141 while( ! a_receive_reply->f_sent_msg_pkg_ptr )
143 std::cv_status t_status = a_receive_reply->f_condition_var.wait_until( t_lock, t_deadline );
144 if( t_status == std::cv_status::timeout )
150 return f_msg_receiver.wait_for_reply( a_receive_reply->f_sent_msg_pkg_ptr, a_status, a_timeout_ms );
Error indicating a problem with the connection to the broker.
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
virtual sent_msg_pkg_ptr send(request_ptr_t a_request, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
@ unknown
Initialized or unknown status.
core & operator=(const core &a_orig)=default
Dripline-specific errors.
Asynchronous message sending.
reply_ptr_t wait_for_reply(const wait_for_send_pkg_ptr a_receive_reply, int a_timeout_ms=0)
std::shared_ptr< message_and_reply > mar_ptr
relayer & operator=(const relayer &)=delete
void execute_relayer()
Thread execution function: sends any messages that are submitted via the send functions.
relayer(const scarab::param_node &a_config, const scarab::authentication &a_auth)
std::shared_ptr< wait_for_send_pkg > wait_for_send_pkg_ptr
wait_for_send_pkg_ptr send_async(request_ptr_t a_request) const
scarab::concurrent_queue< mar_ptr > f_queue
void do_cancellation(int a_code)
std::shared_ptr< msg_alert > alert_ptr_t
std::shared_ptr< message > message_ptr_t
std::shared_ptr< msg_reply > reply_ptr_t
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
std::shared_ptr< msg_request > request_ptr_t