Dripline-Cpp  v2.10.11
Dripline Implementation in C++
receiver.hh
Go to the documentation of this file.
1 /*
2  * receiver.hh
3  *
4  * Created on: Feb 18, 2019
5  * Author: N.S. Oblath
6  */
7 
8 #ifndef DRIPLINE_RECEIVER_HH_
9 #define DRIPLINE_RECEIVER_HH_
10 
11 #include "core.hh"
12 #include "dripline_api.hh"
13 #include "dripline_fwd.hh"
14 
15 #include "cancelable.hh"
16 #include "concurrent_queue.hh"
17 #include "member_variables.hh"
18 
19 #include <atomic>
20 #include <condition_variable>
21 #include <map>
22 #include <thread>
23 
24 namespace dripline
25 {
26 
33  {
36  std::string f_routing_key;
37  std::thread f_thread;
38  std::mutex f_mutex;
39  std::condition_variable f_conv;
40  std::atomic< bool > f_processing;
44  };
45  typedef std::map< std::string, incoming_message_pack > incoming_message_map;
46 
47 
48  // contains mechanisms for receiving messages synchronously
77  class DRIPLINE_API receiver : public virtual scarab::cancelable
78  {
79  public:
80  receiver();
81  receiver( const receiver& a_orig ) = delete;
82  receiver( receiver&& a_orig ) = default;
83  virtual ~receiver() = default;
84 
85  receiver& operator=( const receiver& a_orig ) = delete;
86  receiver& operator=( receiver&& a_orig );
87 
88  public:
92  void handle_message_chunk( amqp_envelope_ptr a_envelope );
93 
96  void wait_for_message( incoming_message_pack& a_pack, const std::string& a_message_id );
98  void process_message_pack( incoming_message_pack& a_pack, const std::string& a_message_id );
99 
102  virtual void process_message( message_ptr_t a_message );
103 
105  mv_referrable( incoming_message_map, incoming_messages );
107  mv_accessible( unsigned, single_message_wait_ms );
109  mv_accessible( unsigned, reply_listen_timeout_ms );
110 
111 
112  public:
120  reply_ptr_t wait_for_reply( const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms = 0 );
129  reply_ptr_t wait_for_reply( const sent_msg_pkg_ptr a_receive_reply, core::post_listen_status& a_status, int a_timeout_ms = 0 );
130 
131  protected:
132  reply_ptr_t process_received_reply( incoming_message_pack& a_pack, const std::string& a_message_id );
133 
134  };
135 
158  {
159  public:
163  virtual ~concurrent_receiver();
164 
166  concurrent_receiver& operator=( concurrent_receiver&& a_orig );
167 
168  public:
170  virtual void process_message( message_ptr_t a_message );
171 
173  void execute();
174 
175  protected:
178  virtual void submit_message( message_ptr_t a_message ) = 0;
179 
180  mv_referrable( scarab::concurrent_queue< message_ptr_t >, message_queue );
181  mv_referrable( std::thread, receiver_thread );
182  };
183 
184 } /* namespace dripline */
185 
186 #endif /* DRIPLINE_RECEIVER_HH_ */
Receives and processes messages concurrently.
Definition: receiver.hh:158
concurrent_receiver(const concurrent_receiver &)=delete
concurrent_receiver & operator=(const concurrent_receiver &)=delete
virtual void submit_message(message_ptr_t a_message)=0
post_listen_status
Definition: core.hh:80
A receiver is able to collect Dripline message chunks and reassemble them into a complete Dripline me...
Definition: receiver.hh:78
virtual ~receiver()=default
receiver(const receiver &a_orig)=delete
receiver & operator=(const receiver &a_orig)=delete
receiver(receiver &&a_orig)=default
#define DRIPLINE_API
Definition: dripline_api.hh:34
std::vector< amqp_message_ptr > amqp_split_message_ptrs
Definition: amqp.hh:31
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
Definition: dripline_fwd.hh:27
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
std::map< std::string, incoming_message_pack > incoming_message_map
Definition: receiver.hh:45
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
Definition: amqp.hh:25
Stores the basic information about a set of message chunks that will eventually make a Dripline messa...
Definition: receiver.hh:33
incoming_message_pack(const incoming_message_pack &)=delete
std::condition_variable f_conv
Definition: receiver.hh:39
std::atomic< bool > f_processing
Definition: receiver.hh:40
amqp_split_message_ptrs f_messages
Definition: receiver.hh:34