Dripline-Cpp  v2.10.11
Dripline Implementation in C++
service.hh
Go to the documentation of this file.
1 /*
2  * service.hh
3  *
4  * Created on: Jan 5, 2016
5  * Author: N.S. Oblath
6  */
7 
8 #ifndef DRIPLINE_SERVICE_HH_
9 #define DRIPLINE_SERVICE_HH_
10 
11 #include "core.hh"
12 #include "endpoint.hh"
13 #include "heartbeater.hh"
14 #include "scheduler.hh"
15 #include "listener.hh"
16 #include "receiver.hh"
17 
18 #include "dripline_exceptions.hh"
19 #include "service_config.hh"
20 #include "uuid.hh"
21 
22 #include <map>
23 #include <memory>
24 #include <set>
25 #include <vector>
26 
27 namespace scarab
28 {
29  class authentication;
30 }
31 namespace dripline
32 {
80  public core,
81  public endpoint,
82  public listener_receiver,
83  public heartbeater,
84  public scheduler<>
85  {
86  protected:
87  enum class status
88  {
89  nothing = 0,
90  channel_created = 10,
91  exchange_declared = 20,
92  queue_declared = 30,
93  queue_bound = 40,
94  consuming = 50,
95  listening = 60,
96  processing = 70
97  };
98 
99  public:
100  /*
101  \brief Extracts necessary configuration and authentication information and prepares the service to interact with the RabbitMQ broker. Does not initiate connection to the broker.
102  @param a_config Dripline configuration object. The `name` must be unique for each service. The `dripline.broker` (and `dripline.broker_port` if needed) should be made appropriate for the mesh.
103  The other parameters can be left as their defaults, or should be made uniform across the mesh.
104  - *Service parameters*
105  - `name` (string; default: dlcpp_service) -- Name of the service and the queue used by the service
106  - `restart_on_error` (bool; default: true) -- Flag for whether the service attempts to restart itself if an error occurs in communicating with the broker
107  - `enable_scheduling` (bool; default: false) -- Flag for enabling the scheduler
108  - `broadcast_key` (string; default: broadcast) -- Routing key used for broadcasts
109  - `loop_timeout_ms` (int; default: 1000) -- Maximum time used for listening timeouts (e.g. waiting for replies) in ms
110  - `message_wait_ms` (int; default: 1000) -- Maximum time used to wait for another AMQP message before declaring a DL message complete, in ms
111  - `heartbeat_interval_s` (int; default: 60) -- Interval between sending heartbeat messages in s
112  - *Dripline core parameters -- within the `dripline` config object*
113  - `dripline.broker` (string; default: localhost) -- Address of the RabbitMQ broker
114  - `dripline.broker_port` (int; default: 5672) -- Port used by the RabbitMQ broker
115  - `dripline.requests_exchange` (string; default: requests) -- Name of the exchange used for DL requests
116  - `dripline.alerts_exchange` (string; default: alerts) -- Name of the exchange used for DL alerts
117  - `dripline.heartbeat_routing_key` (string; default: heartbeat) -- Routing key used for sending heartbeats
118  - `dripline.max_payload_size` (int; default: DL_MAX_PAYLOAD_SIZE) -- Maximum size of payloads, in bytes
119  - `dripline.max_connection_attempts` (int; default: 10) -- Maximum number of attempts that will be made to connect to the broker
120  - `dripline.return_codes` (string or array of nodes; default: not present) -- Optional specification of additional return codes in the form of an array of nodes: `[{name: "<name>", value: <ret code>} <, ...>]`.
121  If this is a string, it's treated as a file can be interpreted by the param system (e.g. YAML or JSON) using the previously-mentioned format
122  @param a_auth Authentication object (type scarab::authentication); authentication specification should be processed, and the authentication data should include:
123  @param a_make_connection Flag for whether or not to contact a broker; if true, this object operates in "dry-run" mode
124  */
125  service( const scarab::param_node& a_config = service_config(),
126  const scarab::authentication& a_auth = create_auth_with_dripline(true),
127  const bool a_make_connection = true );
128 // service( const bool a_make_connection, const scarab::param_node& a_config = scarab::param_node(), const scarab::authentication& a_auth = scarab::authentication() );
129  service( const service& ) = delete;
130  service( service&& a_orig ) = default;
131  virtual ~service();
132 
133  service& operator=( const service& ) = delete;
134  service& operator=( service&& a_orig );
135 
136  mv_referrable( scarab::authentication, auth );
137 
138  mv_accessible( status, status );
139  mv_accessible( bool, restart_on_error );
140  mv_accessible( bool, enable_scheduling );
141 
142  public:
144  bool add_child( endpoint_ptr_t a_endpoint_ptr );
145 
147  bool add_async_child( endpoint_ptr_t a_endpoint_ptr );
148 
149  public:
151  virtual sent_msg_pkg_ptr send( request_ptr_t a_request, amqp_channel_ptr a_channel = amqp_channel_ptr() ) const;
152 
154  virtual sent_msg_pkg_ptr send( reply_ptr_t a_reply, amqp_channel_ptr a_channel = amqp_channel_ptr() ) const;
155 
157  virtual sent_msg_pkg_ptr send( alert_ptr_t a_alert, amqp_channel_ptr a_channel = amqp_channel_ptr() ) const;
158 
159  public:
168  virtual void run();
169 
173  bool start();
174 
178  bool listen();
179 
183  bool stop();
184 
185  protected:
186  virtual bool open_channels();
187 
188  virtual bool setup_queues();
189 
190  virtual bool bind_keys();
191 
192  virtual bool start_consuming();
193 
194  virtual bool stop_consuming();
195 
196  virtual bool remove_queue();
197 
198  public:
201  virtual bool listen_on_queue();
202 
204  virtual void send_reply( reply_ptr_t a_reply ) const;
205 
206  mv_accessible( uuid_t, id );
207 
208  public:
209  typedef std::map< std::string, endpoint_ptr_t > sync_map_t;
210  mv_referrable( sync_map_t, sync_children );
211 
212  typedef std::map< std::string, lr_ptr_t > async_map_t;
213  mv_referrable( async_map_t, async_children );
214 
215  mv_referrable( std::string, broadcast_key );
216 
217  protected:
219  virtual void submit_message( message_ptr_t a_message );
220 
221  virtual reply_ptr_t on_request_message( const request_ptr_t a_request );
222 
223  private:
224  virtual void do_cancellation( int a_code );
225  };
226 
227  inline sent_msg_pkg_ptr service::send( request_ptr_t a_request, amqp_channel_ptr a_channel ) const
228  {
229  a_request->sender_service_name() = f_name;
230  // we don't use f_channel on this core::send command because a channel can only be used in a single thread,
231  // and f_channel is primarily meant for listening with the listener thread.
232  return core::send( a_request, a_channel );
233  }
234 
235  inline sent_msg_pkg_ptr service::send( reply_ptr_t a_reply, amqp_channel_ptr a_channel ) const
236  {
237  a_reply->sender_service_name() = f_name ;
238  // we don't use f_channel on this core::send command because a channel can only be used in a single thread,
239  // and f_channel is primarily meant for listening with the listener thread.
240  return core::send( a_reply, a_channel );
241  }
242 
243  inline sent_msg_pkg_ptr service::send( alert_ptr_t a_alert, amqp_channel_ptr a_channel ) const
244  {
245  a_alert->sender_service_name() = f_name;
246  // we don't use f_channel on this core::send command because a channel can only be used in a single thread,
247  // and f_channel is primarily meant for listening with the listener thread.
248  return core::send( a_alert, a_channel );
249  }
250 
251 } /* namespace dripline */
252 
253 #endif /* DRIPLINE_SERVICE_HH_ */
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
Definition: core.hh:75
virtual sent_msg_pkg_ptr send(request_ptr_t a_request, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
Definition: core.cc:152
Basic Dripline object capable of receiving and acting on messages.
Definition: endpoint.hh:97
A heartbeater repeatedly sends an alert on a particular time interval.
Definition: heartbeater.hh:53
Convenience class to bring together listener and concurrent_receiver.
Definition: listener.hh:77
Executes scheduled events.
Definition: scheduler.hh:104
Sets the default service configuration.
Primary unit of software that connects to a broker and typically provides an interface with an instru...
Definition: service.hh:85
virtual sent_msg_pkg_ptr send(request_ptr_t a_request, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
Sends a request message and returns a channel on which to listen for a reply.
Definition: service.hh:227
service(const service &)=delete
std::map< std::string, endpoint_ptr_t > sync_map_t
Definition: service.hh:209
service & operator=(const service &)=delete
std::map< std::string, lr_ptr_t > async_map_t
Definition: service.hh:212
service(service &&a_orig)=default
#define DRIPLINE_API
Definition: dripline_api.hh:34
AmqpClient::Channel::ptr_t amqp_channel_ptr
Definition: amqp.hh:24
std::shared_ptr< msg_alert > alert_ptr_t
Definition: dripline_fwd.hh:25
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
Definition: dripline_fwd.hh:27
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
scarab::authentication create_auth_with_dripline(bool a_process_spec)
Create an authentication object with the default dripline authentication specification.
std::shared_ptr< msg_request > request_ptr_t
Definition: dripline_fwd.hh:23
boost::uuids::uuid uuid_t
Universally-unique-identifier type containing 16 hexadecimal characters.
Definition: uuid.hh:26
std::shared_ptr< endpoint > endpoint_ptr_t
Definition: dripline_fwd.hh:39
Definition: agent.hh:18