Dripline-Cpp  v2.10.11
Dripline Implementation in C++
receiver.cc
Go to the documentation of this file.
1 /*
2  * receiver.cc
3  *
4  * Created on: Feb 18, 2019
5  * Author: N.S. Oblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "receiver.hh"
11 
12 #include "dripline_exceptions.hh"
13 #include "message.hh"
14 
15 #include "logger.hh"
16 #include "signal_handler.hh"
17 
18 LOGGER( dlog, "receiver" );
19 
20 namespace dripline
21 {
23  f_messages(),
24  f_chunks_received(),
25  f_routing_key(),
26  f_thread(),
27  f_mutex(),
28  f_conv(),
29  f_processing( false )
30  {}
31 
33  f_messages( std::move(a_orig.f_messages) ),
34  f_chunks_received( a_orig.f_chunks_received ),
35  f_routing_key( std::move(a_orig.f_routing_key) ),
36  f_thread( std::move(a_orig.f_thread) ),
37  f_mutex(),
38  f_conv(),
39  f_processing( a_orig.f_processing.load() )
40  {
41  a_orig.f_chunks_received = 0;
42  a_orig.f_processing.store( false );
43  }
44 
45 
47  scarab::cancelable(),
48  f_incoming_messages(),
49  f_single_message_wait_ms( 1000 ),
50  f_reply_listen_timeout_ms( 1000 )
51  {}
52 
54  {
55  cancelable::operator=( std::move(a_orig) );
56  f_incoming_messages = std::move(a_orig.f_incoming_messages);
57  f_single_message_wait_ms = a_orig.f_single_message_wait_ms;
58  f_reply_listen_timeout_ms = a_orig.f_reply_listen_timeout_ms;
59  return *this;
60  }
61 
63  {
64  try
65  {
66  amqp_message_ptr t_message = a_envelope->Message();
67  LDEBUG( dlog, "Received a message chunk <" << t_message->MessageId() );
68 
69  auto t_parsed_message_id = message::parse_message_id( t_message->MessageId() );
70  std::string t_message_id( std::get<0>(t_parsed_message_id) );
71  if( incoming_messages().count( t_message_id ) == 0 )
72  {
73  // this path: first chunk for this message
74  LDEBUG( dlog, "This is the first chunk for this message; creating new message pack" );
75  // create the new message_pack object
76  incoming_message_pack& t_pack = incoming_messages()[t_message_id];
77  // set the f_messages vector to the expected size
78  t_pack.f_messages.resize( std::get<2>(t_parsed_message_id) );
79  // put in place the first message chunk received
80  t_pack.f_messages[std::get<1>(t_parsed_message_id)] = t_message;
81  t_pack.f_routing_key = a_envelope->RoutingKey();
82  t_pack.f_chunks_received = 1;
83 
84  if( t_pack.f_messages.size() == 1 )
85  {
86  // if we only expect one chunk, we can bypass creating a separate thread, etc
87  LDEBUG( dlog, "Single-chunk message being sent directly to processing" );
88  process_message_pack( t_pack, t_message_id );
89  }
90  else
91  {
92  // start the thread to wait for message chunks
93  t_pack.f_thread = std::thread([this, &t_pack, &t_parsed_message_id](){ wait_for_message(t_pack, std::get<0>(t_parsed_message_id)); });
94  t_pack.f_thread.detach();
95  }
96  }
97  else
98  {
99  // this path: have already received chunks from this message
100  LDEBUG( dlog, "This is not the first chunk for this message; adding to message pack" );
101  incoming_message_pack& t_pack = incoming_messages()[std::get<0>(t_parsed_message_id)];
102  if( t_pack.f_processing.load() )
103  {
104  LWARN( dlog, "Message <" << std::get<0>(t_parsed_message_id) << "> is already being processed\n" <<
105  "Just received chunk " << std::get<1>(t_parsed_message_id) << " of " << std::get<2>(t_parsed_message_id) );
106  }
107  else
108  {
109  // lock mutex to access f_messages
110  std::unique_lock< std::mutex > t_lock( t_pack.f_mutex );
111  if( t_pack.f_messages[std::get<1>(t_parsed_message_id)] )
112  {
113  LWARN( dlog, "Received duplicate message chunk for message <" << std::get<0>(t_parsed_message_id) << ">; chunk " << std::get<1>(t_parsed_message_id) );
114  }
115  else
116  {
117  // add chunk to set of chunks
118  t_pack.f_messages[std::get<1>(t_parsed_message_id)] = t_message;
119  ++t_pack.f_chunks_received;
120  t_lock.unlock();
121  // inform the message-processing thread it should check whether it has the complete message
122  t_pack.f_conv.notify_one();
123  }
124  }
125  } // new/current message if/else block
126  }
127  catch( dripline_error& e )
128  {
129  LERROR( dlog, "Dripline exception caught while handling message chunk: " << e.what() );
130  }
131  catch( std::exception& e )
132  {
133  LERROR( dlog, "Standard exception caught while handling message chunk: " << e.what() );
134  }
135 
136  return;
137  }
138 
139  void receiver::wait_for_message( incoming_message_pack& a_pack, const std::string& a_message_id )
140  {
141  std::unique_lock< std::mutex > t_lock( a_pack.f_mutex );
142 
143  LDEBUG( dlog, "Waiting for message; chunks received: " << a_pack.f_chunks_received << " chunks expected: " << a_pack.f_messages.size() );
144 
145  // if the message is already complete, submit it for processing
146  if( a_pack.f_chunks_received == a_pack.f_messages.size() )
147  {
148  t_lock.release(); // process_message() will unlock the mutex before erasing the message pack
149  process_message_pack( a_pack, a_message_id );
150  return;
151  }
152 
153  auto t_now = std::chrono::system_clock::now();
154  while( a_pack.f_conv.wait_until( t_lock, t_now + std::chrono::milliseconds(f_single_message_wait_ms) ) == std::cv_status::no_timeout )
155  {
156  // if the message is complete during the waiting period, submit it for processing
157  if( a_pack.f_chunks_received == a_pack.f_messages.size() )
158  {
159  t_lock.release(); // process_message() will unlock the mutex before erasing the message pack
160  process_message_pack( a_pack, a_message_id );
161  return;
162  }
163  }
164 
165  // once the waiting period is over, submit it whether it's complete or not
166  t_lock.release(); // process_message() will unlock the mutex before erasing the message pack
167  LWARN( dlog, "Timed out; message may be incomplete" );
168  process_message_pack( a_pack, a_message_id );
169 
170  return;
171  }
172 
173  void receiver::process_message_pack( incoming_message_pack& a_pack, const std::string& a_message_id )
174  {
175  a_pack.f_processing.store( true );
176  try
177  {
178  message_ptr_t t_message = message::process_message( a_pack.f_messages, a_pack.f_routing_key );
179 
180  a_pack.f_mutex.unlock();
181  incoming_messages().erase( a_message_id );
182 
183  // if the message is not valid at this point, continue processing it, and we'll deal with it in the endpoint class
184 
185  this->process_message( t_message );
186 
187  return;
188  }
189  catch( dripline_error& e )
190  {
191  LERROR( dlog, "Dripline exception caught while processing message pack: " << e.what() );
192  }
193  catch( std::exception& e )
194  {
195  LERROR( dlog, "Standard exception caught while processing message pack: " << e.what() );
196  }
197 
198  return;
199  }
200 
202  {
203  throw dripline_error() << "Process_message function has not been implemented";
204  }
205 
206  reply_ptr_t receiver::wait_for_reply( const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms )
207  {
209  return wait_for_reply( a_receive_reply, t_temp, a_timeout_ms );
210  }
211 
212  reply_ptr_t receiver::wait_for_reply( const sent_msg_pkg_ptr a_receive_reply, core::post_listen_status& a_status, int a_timeout_ms )
213  {
214  if ( ! a_receive_reply->f_channel )
215  {
216  return reply_ptr_t();
217  }
218 
219  if( a_timeout_ms != 0 )
220  {
221  LDEBUG( dlog, "Waiting for a reply (timeout: " << a_timeout_ms << " ms)" );
222  }
223  else
224  {
225  LDEBUG( dlog, "Waiting for a reply (no timeout)" );
226  }
227 
228  // Assign the chunk timeout time; it should be f_reply_listen_timeout_ms unless a_timeout_ms is shorter than f_reply_listen_timeout_ms
229  unsigned t_chunk_timeout_ms = f_reply_listen_timeout_ms;
230  if( a_timeout_ms > 0 && a_timeout_ms < t_chunk_timeout_ms )
231  {
232  t_chunk_timeout_ms = a_timeout_ms;
233  }
234 
235  // for checking the wait_for_reply timeout
236  auto t_timeout_time = std::chrono::system_clock::now() + std::chrono::milliseconds(a_timeout_ms);
237 
238  // wait for messages until either:
239  // 1. the channel is no longer valid (return empty reply pointer; a_chan_valid will be false)
240  // 2. listening times out (return empty reply pointer; a_chan_valid will be true)
241  // 3. a full dripline message is received (return message)
242  // 4. error processing a recieved amqp message (return empty reply pointer)
243  while( ! is_canceled() && (a_timeout_ms == 0 || std::chrono::system_clock::now() < t_timeout_time) )
244  {
245  amqp_envelope_ptr t_envelope;
246  core::listen_for_message( t_envelope, a_status, a_receive_reply->f_channel, a_receive_reply->f_consumer_tag, t_chunk_timeout_ms, false );
247 
248  // check whether we canceled while listening
249  if( is_canceled() )
250  {
251  LDEBUG( dlog, "Receiver was canceled before receiving reply" );
252  return reply_ptr_t();
253  }
254 
255  // there was a soft error listening on the channel; no message received
256  if( a_status == core::post_listen_status::soft_error )
257  {
258  LWARN( dlog, "There was a soft error while listening for a reply; no message received" );
259  continue;
260  }
261 
262  // there was a hard error listening on the channel; no message received
263  if( a_status == core::post_listen_status::hard_error )
264  {
265  LERROR( dlog, "There was a hard error error while listening for a reply; no message received" );
266  return reply_ptr_t();
267  }
268 
269  // listening timed out
270  if( a_status == core::post_listen_status::timeout )
271  {
272  LTRACE( dlog, "Listening for reply message chunks timed out" );
273  // continue to wait for message chunks
274  continue;
275  }
276 
277  // unknown state; should not get here
278  if( a_status == core::post_listen_status::unknown )
279  {
280  LERROR( dlog, "An unknown status occurred while listening for messages" );
281  return reply_ptr_t();
282  }
283 
284  // because core::listen_for_message uses whether or not the envelope is empty to differentiate between a received message and a timeout,
285  // we will never have an empty envelope and a status == message_received
286 
287  // go ahead with message processing
288  try
289  {
290  amqp_message_ptr t_message = t_envelope->Message();
291  LDEBUG( dlog, "Received a message chunk <" << t_message->MessageId() );
292 
293  auto t_parsed_message_id = message::parse_message_id( t_message->MessageId() );
294  if( f_incoming_messages.count( std::get<0>(t_parsed_message_id) ) == 0 )
295  {
296  // this path: first chunk for this message
297  LDEBUG( dlog, "This is the first chunk for this message; creating new message pack" );
298  // create the new message_pack object
299  incoming_message_pack& t_pack = f_incoming_messages[std::get<0>(t_parsed_message_id)];
300  // set the f_messages vector to the expected size
301  t_pack.f_messages.resize( std::get<2>(t_parsed_message_id) );
302  // put in place the first message chunk received
303  t_pack.f_messages[std::get<1>(t_parsed_message_id)] = t_message;
304  t_pack.f_routing_key = t_envelope->RoutingKey();
305  t_pack.f_chunks_received = 1;
306 
307  if( t_pack.f_chunks_received == t_pack.f_messages.size() )
308  {
309  return process_received_reply( t_pack, std::get<0>(t_parsed_message_id) );
310  }
311  // else, need more chunks
312  }
313  else
314  {
315  // this path: have already received chunks from this message
316  LDEBUG( dlog, "This is not the first chunk for this message; adding to message pack" );
317  incoming_message_pack& t_pack = f_incoming_messages[std::get<0>(t_parsed_message_id)];
318  if( t_pack.f_processing.load() )
319  {
320  LWARN( dlog, "Message <" << std::get<0>(t_parsed_message_id) << "> is already being processed\n" <<
321  "Just received chunk " << std::get<1>(t_parsed_message_id) << " of " << std::get<2>(t_parsed_message_id) );
322  }
323  else
324  {
325  if( t_pack.f_messages[std::get<1>(t_parsed_message_id)] )
326  {
327  LWARN( dlog, "Received duplicate message chunk for message <" << std::get<0>(t_parsed_message_id) << ">; chunk " << std::get<1>(t_parsed_message_id) );
328  }
329  else
330  {
331  // add chunk to set of chunks
332  t_pack.f_messages[std::get<1>(t_parsed_message_id)] = t_message;
333  ++t_pack.f_chunks_received;
334  if( t_pack.f_chunks_received == t_pack.f_messages.size() )
335  {
336  return process_received_reply( t_pack, std::get<0>(t_parsed_message_id) );
337  }
338  }
339  }
340  }
341 
342  }
343  catch( dripline_error& e )
344  {
345  LERROR( dlog, "There was a problem processing the message: " << e.what() );
347  return reply_ptr_t();
348  }
349 
350  } // end while( ! is_canceled() && not timed out )
351 
352  // check if listening timed out
353  if( ! is_canceled() && std::chrono::system_clock::now() > t_timeout_time )
354  {
355  LINFO( dlog, "Listening for reply message timed out" );
357  return reply_ptr_t();
358  }
359 
360  LDEBUG( dlog, "Receiver was canceled" );
361  return reply_ptr_t();
362  }
363 
364  reply_ptr_t receiver::process_received_reply( incoming_message_pack& a_pack, const std::string& a_message_id )
365  {
366  a_pack.f_processing.store( true );
367  try
368  {
369  message_ptr_t t_message = message::process_message( a_pack.f_messages, a_pack.f_routing_key );
370 
371  f_incoming_messages.erase( a_message_id );
372 
373  if( t_message->is_reply() )
374  {
375  return std::static_pointer_cast< msg_reply >( t_message );
376  }
377  else
378  {
379  throw dripline_error() << "Non-reply message received";
380  }
381  }
382  catch( dripline_error& e )
383  {
384  LERROR( dlog, "Dripline exception caught while handling message: " << e.what() );
385  }
386  catch( amqp_exception& e )
387  {
388  LERROR( dlog, "AMQP exception caught while sending reply: (" << e.reply_code() << ") " << e.reply_text() );
389  }
390  catch( amqp_lib_exception& e )
391  {
392  LERROR( dlog, "AMQP Library Exception caught while sending reply: (" << e.ErrorCode() << ") " << e.what() );
393  }
394  catch( std::exception& e )
395  {
396  LERROR( dlog, "Standard exception caught while sending reply: " << e.what() );
397  }
398 
399  return reply_ptr_t();
400 
401  }
402 
404  receiver(),
405  f_message_queue()
406  {}
407 
409  receiver( std::move(a_orig) ),
410  f_message_queue()
411  {}
412 
414  {}
415 
417  {
418  receiver::operator=( std::move(a_orig) );
419  // nothing to do with message queue
420  return *this;
421  }
422 
424  {
425  f_message_queue.push( a_message );
426  return;
427  }
428 
430  {
431  try
432  {
433  while( ! is_canceled() )
434  {
435  message_ptr_t t_message;
436  if( f_message_queue.timed_wait_and_pop( t_message ) )
437  {
438  this->submit_message( t_message );
439  }
440  }
441  }
442  catch( const std::exception& e )
443  {
444  // shutdown gracefully on an exception
445  LERROR( dlog, "Exception caught; shutting down.\n" << "\t" << e.what() );
446  scarab::signal_handler::cancel_all( RETURN_ERROR );
447  }
448  }
449 
450 
451 
452 } /* namespace dripline */
Receives and processes messages concurrently.
Definition: receiver.hh:158
concurrent_receiver & operator=(const concurrent_receiver &)=delete
void execute()
Handles messages that appear in the concurrent queue by calling submit_message().
Definition: receiver.cc:429
virtual void submit_message(message_ptr_t a_message)=0
virtual void process_message(message_ptr_t a_message)
Deposits the message in the concurrent queue (called by the listener)
Definition: receiver.cc:423
static void listen_for_message(amqp_envelope_ptr &a_envelope, post_listen_status &a_status, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
listen for a single AMQP message
Definition: core.cc:528
post_listen_status
Definition: core.hh:80
@ timeout
A timeout occurred, and the channel is still valid.
@ unknown
Initialized or unknown status.
@ soft_error
An error occurred, but the channel should still be valid.
@ hard_error
An error occurred, and the channel is no longer valid.
Dripline-specific errors.
static message_ptr_t process_message(amqp_split_message_ptrs a_message_ptrs, const std::string &a_routing_key)
Converts a set of AMQP messages to a Dripline message object.
Definition: message.cc:128
static std::tuple< std::string, unsigned, unsigned > parse_message_id(const std::string &a_message_id)
Parses the message ID, which should be of the form [UUID]/[chunk]/[total chunks].
Definition: message.cc:114
A receiver is able to collect Dripline message chunks and reassemble them into a complete Dripline me...
Definition: receiver.hh:78
void process_message_pack(incoming_message_pack &a_pack, const std::string &a_message_id)
Converts a message pack into a Dripline message, and then submits the message for processing.
Definition: receiver.cc:173
receiver & operator=(const receiver &a_orig)=delete
void handle_message_chunk(amqp_envelope_ptr a_envelope)
Definition: receiver.cc:62
virtual void process_message(message_ptr_t a_message)
Definition: receiver.cc:201
reply_ptr_t wait_for_reply(const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms=0)
Definition: receiver.cc:206
reply_ptr_t process_received_reply(incoming_message_pack &a_pack, const std::string &a_message_id)
Definition: receiver.cc:364
void wait_for_message(incoming_message_pack &a_pack, const std::string &a_message_id)
Definition: receiver.cc:139
AmqpClient::AmqpLibraryException amqp_lib_exception
Definition: amqp.hh:29
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
Definition: dripline_fwd.hh:27
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
AmqpClient::BasicMessage::ptr_t amqp_message_ptr
Definition: amqp.hh:26
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
Definition: amqp.hh:25
AmqpClient::AmqpException amqp_exception
Definition: amqp.hh:28
Definition: agent.hh:18
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("receiver", __FILE_NAME__, __LINE__)
Stores the basic information about a set of message chunks that will eventually make a Dripline messa...
Definition: receiver.hh:33
std::condition_variable f_conv
Definition: receiver.hh:39
std::atomic< bool > f_processing
Definition: receiver.hh:40
amqp_split_message_ptrs f_messages
Definition: receiver.hh:34