Dripline-Cpp  v2.10.11
Dripline Implementation in C++
relayer.cc
Go to the documentation of this file.
1 #define DRIPLINE_API_EXPORTS
2 
3 #include "relayer.hh"
4 
5 #include "dripline_exceptions.hh"
6 
7 #include "logger.hh"
8 #include "param.hh"
9 
10 #include <chrono>
11 
12 namespace dripline
13 {
14  LOGGER( dlog, "relayer" );
15 
16  relayer::relayer( const scarab::param_node& a_config, const scarab::authentication& a_auth ) :
17  core( a_config["dripline_mesh"].as_node(), a_auth ),
18  scarab::cancelable(),
19  f_queue(),
20  f_msg_receiver()
21  {}
22 
24  {
25  core::operator=( std::move(a_orig) );
26  cancelable::operator=( std::move(a_orig) );
27 
28  f_queue = std::move( a_orig.f_queue );
29  f_msg_receiver = std::move( a_orig.f_msg_receiver );
30 
31  return *this;
32  }
33 
35  {
36  LDEBUG( dlog, "Dripline relayer starting" );
37  while( ! is_canceled() )
38  {
39  mar_ptr t_mar;
40  bool t_have_message = f_queue.timed_wait_and_pop( t_mar ); // blocking call for next message to send; timed so that cancellation can be rechecked
41  if( ! t_have_message ) continue;
42 
43  try
44  {
45  std::unique_lock< std::mutex > lock( t_mar->f_wait_for_send_pkg->f_mutex );
46  switch( t_mar->f_message->message_type() )
47  {
48  case msg_t::request:
49  t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr = core::send( std::static_pointer_cast< dripline::msg_request >( t_mar->f_message ) );
50  break;
51  case msg_t::alert:
52  t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr = core::send( std::static_pointer_cast< dripline::msg_alert >( t_mar->f_message ) );
53  break;
54  default:
55  throw dripline_error() << "Unsupported message type: " << t_mar->f_message->message_type();
56  break;
57  }
58  if( ! t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr->f_successful_send )
59  {
60  LERROR( dlog, "Message sending failed: " << t_mar->f_wait_for_send_pkg->f_sent_msg_pkg_ptr->f_send_error_message << '\n' << *t_mar->f_message );
61  }
62  t_mar->f_wait_for_send_pkg->f_condition_var.notify_one();
63  continue;
64  }
65  catch( message_ptr_t )
66  {
67  LWARN( dlog, "Operating in offline mode; message not sent" );
68  }
69  catch( connection_error& e )
70  {
71  LERROR( dlog, "Unable to connect to the broker:\n" << e.what() << '\n' << *t_mar->f_message );
72  }
73  catch( dripline_error& e )
74  {
75  LERROR( dlog, "Dripline error while sending reply:\n" << e.what() << '\n' << *t_mar->f_message );
76  }
77 
78  }
79 
80  LDEBUG( dlog, "Exiting the Dripline relayer" );
81 
82  return;
83  }
84 
85 
87  {
88  LDEBUG( dlog, "Canceling relayer" );
89  f_queue.interrupt();
90  return;
91  }
92 
94  {
95  if( is_canceled() )
96  {
97  LWARN( dlog, "Relayer has been canceled; request not sent" );
98  wait_for_send_pkg_ptr t_return;
99  t_return->f_sent_msg_pkg_ptr = std::make_shared< sent_msg_pkg >();
100  t_return->f_sent_msg_pkg_ptr->f_successful_send = false;
101  return t_return;
102  }
103  LDEBUG( dlog, "Sending request to <" << a_request->routing_key() << ">" );
104  mar_ptr t_mar = std::make_shared< message_and_reply >();
105  std::unique_lock< std::mutex > lock( t_mar->f_wait_for_send_pkg->f_mutex );
106  t_mar->f_message = std::static_pointer_cast< dripline::message >( a_request );
107  t_mar->f_wait_for_send_pkg = std::make_shared< wait_for_send_pkg >();
108  f_queue.push( t_mar );
109  return t_mar->f_wait_for_send_pkg;
110  }
111 
113  {
114  if( is_canceled() )
115  {
116  LWARN( dlog, "Relayer has been canceled; request not sent" );
117  wait_for_send_pkg_ptr t_return;
118  t_return->f_sent_msg_pkg_ptr = std::make_shared< sent_msg_pkg >();
119  t_return->f_sent_msg_pkg_ptr->f_successful_send = false;
120  return t_return;
121  }
122  LDEBUG( dlog, "Sending alert to <" << a_alert->routing_key() << ">" );
123  mar_ptr t_mar = std::make_shared< message_and_reply >();
124  std::unique_lock< std::mutex > lock( t_mar->f_wait_for_send_pkg->f_mutex );
125  t_mar->f_message = std::static_pointer_cast< dripline::message >( a_alert );
126  t_mar->f_wait_for_send_pkg = std::make_shared< wait_for_send_pkg >();
127  f_queue.push( t_mar );
128  return t_mar->f_wait_for_send_pkg;
129  }
130 
131  reply_ptr_t relayer::wait_for_reply( const wait_for_send_pkg_ptr a_receive_reply, int a_timeout_ms )
132  {
134  return wait_for_reply( a_receive_reply, t_temp, a_timeout_ms );
135  }
136 
137  reply_ptr_t relayer::wait_for_reply( const wait_for_send_pkg_ptr a_receive_reply, core::post_listen_status& a_status, int a_timeout_ms )
138  {
139  std::unique_lock< std::mutex > t_lock( a_receive_reply->f_mutex );
140  auto t_deadline = std::chrono::system_clock::now() + std::chrono::milliseconds( a_timeout_ms );
141  while( ! a_receive_reply->f_sent_msg_pkg_ptr )
142  {
143  std::cv_status t_status = a_receive_reply->f_condition_var.wait_until( t_lock, t_deadline );
144  if( t_status == std::cv_status::timeout )
145  {
146  // timeout
147  return reply_ptr_t();
148  }
149  }
150  return f_msg_receiver.wait_for_reply( a_receive_reply->f_sent_msg_pkg_ptr, a_status, a_timeout_ms );
151  }
152 
153 }
Error indicating a problem with the connection to the broker.
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
Definition: core.hh:75
virtual sent_msg_pkg_ptr send(request_ptr_t a_request, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
Definition: core.cc:152
post_listen_status
Definition: core.hh:80
@ unknown
Initialized or unknown status.
core & operator=(const core &a_orig)=default
Dripline-specific errors.
Asynchronous message sending.
Definition: relayer.hh:38
reply_ptr_t wait_for_reply(const wait_for_send_pkg_ptr a_receive_reply, int a_timeout_ms=0)
Definition: relayer.cc:131
std::shared_ptr< message_and_reply > mar_ptr
Definition: relayer.hh:107
relayer & operator=(const relayer &)=delete
void execute_relayer()
Thread execution function: sends any messages that are submitted via the send functions.
Definition: relayer.cc:34
relayer(const scarab::param_node &a_config, const scarab::authentication &a_auth)
Definition: relayer.cc:16
std::shared_ptr< wait_for_send_pkg > wait_for_send_pkg_ptr
Definition: relayer.hh:72
wait_for_send_pkg_ptr send_async(request_ptr_t a_request) const
Definition: relayer.cc:93
scarab::concurrent_queue< mar_ptr > f_queue
Definition: relayer.hh:109
void do_cancellation(int a_code)
Definition: relayer.cc:86
std::shared_ptr< msg_alert > alert_ptr_t
Definition: dripline_fwd.hh:25
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
std::shared_ptr< msg_request > request_ptr_t
Definition: dripline_fwd.hh:23
Definition: agent.hh:18