Dripline-Cpp  v2.10.11
Dripline Implementation in C++
Public Types | Public Member Functions | Protected Types | Protected Member Functions | Private Member Functions | List of all members
service Class Reference

Primary unit of software that connects to a broker and typically provides an interface with an instrument or other software. More...

#include <service.hh>

Inheritance diagram for service:
Inheritance graph

Public Types

typedef std::map< std::string, endpoint_ptr_tsync_map_t
 
typedef std::map< std::string, lr_ptr_tasync_map_t
 
- Public Types inherited from core
enum class  post_listen_status {
  unknown , message_received , timeout , soft_error ,
  hard_error
}
 
- Public Types inherited from scheduler< executor, clock >
using clock_t = clock
 
using time_point_t = typename clock::time_point
 
using duration_t = typename clock::duration
 
using executable_t = std::function< void() >
 
typedef std::multimap< time_point_t, eventevents_map_t
 

Public Member Functions

 service (const scarab::param_node &a_config=service_config(), const scarab::authentication &a_auth=create_auth_with_dripline(true), const bool a_make_connection=true)
 
 service (const service &)=delete
 
 service (service &&a_orig)=default
 
virtual ~service ()
 
serviceoperator= (const service &)=delete
 
serviceoperator= (service &&a_orig)
 
 snake_case_mv_referrable (scarab::authentication, auth)
 
 snake_case_mv_accessible (status, status)
 
 snake_case_mv_accessible (bool, restart_on_error)
 
 snake_case_mv_accessible (bool, enable_scheduling)
 
bool add_child (endpoint_ptr_t a_endpoint_ptr)
 Add a synchronous child endpoint. More...
 
bool add_async_child (endpoint_ptr_t a_endpoint_ptr)
 Add an asynchronous child endpoint. More...
 
virtual sent_msg_pkg_ptr send (request_ptr_t a_request, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
 Sends a request message and returns a channel on which to listen for a reply. More...
 
virtual sent_msg_pkg_ptr send (reply_ptr_t a_reply, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
 Sends a reply message. More...
 
virtual sent_msg_pkg_ptr send (alert_ptr_t a_alert, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
 Sends an alert message. More...
 
virtual void run ()
 
bool start ()
 
bool listen ()
 
bool stop ()
 
virtual bool listen_on_queue ()
 
virtual void send_reply (reply_ptr_t a_reply) const
 Sends a reply message. More...
 
 snake_case_mv_accessible (uuid_t, id)
 
 snake_case_mv_referrable (sync_map_t, sync_children)
 
 snake_case_mv_referrable (async_map_t, async_children)
 
 snake_case_mv_referrable (std::string, broadcast_key)
 
- Public Member Functions inherited from core
 core (const scarab::param_node &a_config=dripline_config(), const scarab::authentication &a_auth=scarab::authentication(), const bool a_make_connection=true)
 
 core (const core &a_orig)=default
 
 core (core &&a_orig)=default
 
virtual ~core ()=default
 
coreoperator= (const core &a_orig)=default
 
coreoperator= (core &&a_orig)=default
 
 snake_case_mv_referrable (std::string, address)
 
 snake_case_mv_accessible (unsigned, port)
 
 snake_case_mv_referrable (std::string, username)
 
 snake_case_mv_referrable (std::string, password)
 
 snake_case_mv_referrable (std::string, requests_exchange)
 
 snake_case_mv_referrable (std::string, alerts_exchange)
 
 snake_case_mv_referrable (std::string, heartbeat_routing_key)
 
 snake_case_mv_accessible (unsigned, max_payload_size)
 
 snake_case_mv_accessible (bool, make_connection)
 
 snake_case_mv_accessible (unsigned, max_connection_attempts)
 
- Public Member Functions inherited from endpoint
 endpoint (const std::string &a_name)
 
 endpoint (const endpoint &a_orig)=default
 
 endpoint (endpoint &&a_orig)=default
 
virtual ~endpoint ()=default
 
endpointoperator= (const endpoint &a_orig)=default
 
endpointoperator= (endpoint &&a_orig)=default
 
 snake_case_mv_referrable (std::string, name)
 
 snake_case_mv_accessible (service *, service)
 
serviceparent ()
 
const serviceparent () const
 
reply_ptr_t submit_request_message (const request_ptr_t a_request)
 Directly submit a request message to this endpoint. More...
 
void submit_reply_message (const reply_ptr_t a_reply)
 Directly submit a reply message to this endpoint. More...
 
void submit_alert_message (const alert_ptr_t a_alert)
 Directly submit an alert message to this endpoint. More...
 
virtual void on_reply_message (const reply_ptr_t a_reply)
 
virtual void on_alert_message (const alert_ptr_t a_alert)
 
virtual reply_ptr_t do_run_request (const request_ptr_t a_request)
 
virtual reply_ptr_t do_get_request (const request_ptr_t a_request)
 
virtual reply_ptr_t do_set_request (const request_ptr_t a_request)
 
virtual reply_ptr_t do_cmd_request (const request_ptr_t a_request)
 
void sort_message (const message_ptr_t a_request)
 
uuid_t enable_lockout (const scarab::param_node &a_tag)
 enable lockout with randomly-generated key More...
 
uuid_t enable_lockout (const scarab::param_node &a_tag, uuid_t a_key)
 enable lockout with user-supplied key More...
 
bool disable_lockout (const uuid_t &a_key, bool a_force=false)
 
bool is_locked () const
 
bool check_key (const uuid_t &a_key) const
 
- Public Member Functions inherited from listener_receiver
 listener_receiver ()
 
 listener_receiver (const listener_receiver &)=delete
 
 listener_receiver (listener_receiver &&a_orig)
 
listener_receiveroperator= (const listener_receiver &)=delete
 
listener_receiveroperator= (listener_receiver &&a_orig)
 
- Public Member Functions inherited from listener
 listener ()
 
 listener (const listener &)=delete
 
 listener (listener &&a_orig)=default
 
virtual ~listener ()=default
 
listeneroperator= (const listener &)=delete
 
listeneroperator= (listener &&a_orig)
 
 snake_case_mv_referrable (amqp_channel_ptr, channel)
 
 snake_case_mv_referrable (std::string, consumer_tag)
 
 snake_case_mv_accessible (unsigned, listen_timeout_ms)
 
 snake_case_mv_referrable (std::thread, listener_thread)
 
- Public Member Functions inherited from concurrent_receiver
 concurrent_receiver ()
 
 concurrent_receiver (const concurrent_receiver &)=delete
 
 concurrent_receiver (concurrent_receiver &&a_orig)
 
virtual ~concurrent_receiver ()
 
concurrent_receiveroperator= (const concurrent_receiver &)=delete
 
concurrent_receiveroperator= (concurrent_receiver &&a_orig)
 
virtual void process_message (message_ptr_t a_message)
 Deposits the message in the concurrent queue (called by the listener) More...
 
void execute ()
 Handles messages that appear in the concurrent queue by calling submit_message(). More...
 
- Public Member Functions inherited from receiver
 receiver ()
 
 receiver (const receiver &a_orig)=delete
 
 receiver (receiver &&a_orig)=default
 
virtual ~receiver ()=default
 
receiveroperator= (const receiver &a_orig)=delete
 
receiveroperator= (receiver &&a_orig)
 
void handle_message_chunk (amqp_envelope_ptr a_envelope)
 
void wait_for_message (incoming_message_pack &a_pack, const std::string &a_message_id)
 
void process_message_pack (incoming_message_pack &a_pack, const std::string &a_message_id)
 Converts a message pack into a Dripline message, and then submits the message for processing. More...
 
 snake_case_mv_referrable (incoming_message_map, incoming_messages)
 Stores the incomplete messages. More...
 
 snake_case_mv_accessible (unsigned, single_message_wait_ms)
 Wait time for all message chunks from a single dripline message. More...
 
 snake_case_mv_accessible (unsigned, reply_listen_timeout_ms)
 Listen timeout for individual message chunks when waiting for replies. More...
 
reply_ptr_t wait_for_reply (const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms=0)
 
reply_ptr_t wait_for_reply (const sent_msg_pkg_ptr a_receive_reply, core::post_listen_status &a_status, int a_timeout_ms=0)
 
- Public Member Functions inherited from heartbeater
 heartbeater (service *a_service)
 Primary constructor. A service pointer is required to be able to send messages. More...
 
 heartbeater (const heartbeater &)=delete
 
 heartbeater (heartbeater &&a_orig)=default
 
virtual ~heartbeater ()=default
 
heartbeateroperator= (const heartbeater &)=delete
 
heartbeateroperator= (heartbeater &&a_orig)
 
void execute (const std::string &a_name, uuid_t a_id, const std::string &a_routing_key)
 
 snake_case_mv_accessible (unsigned, heartbeat_interval_s)
 Interval between heartbeat alerts (default: 60 s) More...
 
 snake_case_mv_accessible (unsigned, check_timeout_ms)
 Timing interval for the internal loop (default: 1000 ms) More...
 
 snake_case_mv_accessible (service *, service)
 
- Public Member Functions inherited from scheduler< executor, clock >
 scheduler ()
 
 scheduler (const scheduler &)=delete
 
 scheduler (scheduler &&)
 
virtual ~scheduler ()=default
 
scheduleroperator= (const scheduler &)=delete
 
scheduleroperator= (scheduler &&a_orig)
 
int schedule (executable_t an_executable, time_point_t an_exe_time)
 
int schedule (executable_t an_executable, duration_t an_interval, time_point_t an_exe_time=clock::now())
 
void unschedule (int an_id)
 Unschedule an event using the event's ID. More...
 
void execute ()
 Main execution loop for the scheduler. More...
 
 snake_case_mv_accessible (duration_t, exe_buffer)
 The time difference from "now" that determines whether an event is executed. More...
 
 snake_case_mv_accessible (duration_t, cycle_time)
 Main thread cycle time. More...
 
 snake_case_mv_referrable_const (executor, the_executor)
 The executor used to execute events.
More...
 
 snake_case_mv_referrable_const (events_map_t, events)
 The scheduled events, stored in a map sorted by execution time. More...
 

Protected Types

enum class  status {
  nothing = 0 , channel_created = 10 , exchange_declared = 20 , queue_declared = 30 ,
  queue_bound = 40 , consuming = 50 , listening = 60 , processing = 70
}
 

Protected Member Functions

virtual bool open_channels ()
 
virtual bool setup_queues ()
 
virtual bool bind_keys ()
 
virtual bool start_consuming ()
 
virtual bool stop_consuming ()
 
virtual bool remove_queue ()
 
virtual void submit_message (message_ptr_t a_message)
 Implementation of submit_message (from concurrent_receiver) More...
 
virtual reply_ptr_t on_request_message (const request_ptr_t a_request)
 Default request handler; passes request to initial request functions. More...
 
- Protected Member Functions inherited from core
sent_msg_pkg_ptr do_send (message_ptr_t a_message, const std::string &a_exchange, bool a_expect_reply, amqp_channel_ptr a_channel=amqp_channel_ptr()) const
 
amqp_channel_ptr send_withreply (message_ptr_t a_message, std::string &a_reply_consumer_tag, const std::string &a_exchange) const
 
bool send_noreply (message_ptr_t a_message, const std::string &a_exchange) const
 
amqp_channel_ptr open_channel () const
 
- Protected Member Functions inherited from endpoint
bool authenticate (const uuid_t &a_key) const
 Returns true if the server is unlocked or if it's locked and the key matches the lockout key; returns false otherwise. More...
 
 snake_case_mv_referrable (scarab::param_node, lockout_tag)
 
 snake_case_mv_accessible (uuid_t, lockout_key)
 
- Protected Member Functions inherited from concurrent_receiver
 snake_case_mv_referrable (scarab::concurrent_queue< message_ptr_t >, message_queue)
 
 snake_case_mv_referrable (std::thread, receiver_thread)
 
- Protected Member Functions inherited from receiver
reply_ptr_t process_received_reply (incoming_message_pack &a_pack, const std::string &a_message_id)
 

Private Member Functions

virtual void do_cancellation (int a_code)
 

Additional Inherited Members

- Static Public Member Functions inherited from core
static void listen_for_message (amqp_envelope_ptr &a_envelope, post_listen_status &a_status, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
 listen for a single AMQP message More...
 
- Public Attributes inherited from scheduler< executor, clock >
snake_case_mv_accessible_static(int, curr_id) protected std::recursive_mutex f_scheduler_mutex
 The ID to be used for the next scheduled event. More...
 
std::mutex f_executor_mutex
 
std::condition_variable_any f_cv
 
std::thread f_scheduler_thread
 
- Static Public Attributes inherited from core
static bool s_offline = false
 
- Static Protected Member Functions inherited from core
static bool setup_exchange (amqp_channel_ptr a_channel, const std::string &a_exchange)
 
static bool setup_queue (amqp_channel_ptr a_channel, const std::string &a_queue_name)
 
static bool bind_key (amqp_channel_ptr a_channel, const std::string &a_exchange, const std::string &a_queue_name, const std::string &a_routing_key)
 
static std::string start_consuming (amqp_channel_ptr a_channel, const std::string &a_queue_name)
 
static bool stop_consuming (amqp_channel_ptr a_channel, std::string &a_consumer_tag)
 
static bool remove_queue (amqp_channel_ptr a_channel, const std::string &a_queue_name)
 
- Protected Attributes inherited from heartbeater
std::thread f_heartbeat_thread
 

Detailed Description

Primary unit of software that connects to a broker and typically provides an interface with an instrument or other software.

Author
N.S. Oblath

The service class is the implementation of the "service" concept in Dripline. It's the primary component that makes up a Dripline mesh.

The lifetime of a service is defined by the three main functions:

  1. start() – create the AMQP channel, create the AMQP queue, bind the routing keys, and start consuming AMQP messages
  2. listen() – starts the heartbeat and scheduler threads (optional), starts the receiver thread, and waits for and handles messages on the queue
  3. stop() – (called asynchronously) cancels the listening service

The ability to handle and respond to Dripline messages is embodied in the endpoint class.
Service uses endoint in three ways:

  1. Service is an endpoint. A service can be setup to handle messages directed to it.
  2. Service has basic child endpoints. These are also called "synchronous" endpoints.
    These endpoints use the same AMQP queue as the service itself. Messages send to the service and to the synchronous endpoints are all handled serially.
  3. Service has asynchronous child endpoints. These endpoints each have their own AMQP queue and thread responsible for receiving and handling their messages.

A service has a number of key characteristics (most of which come from its parent classes): core – Has all of the basic AMQP capabilities, sending messages, and making and manipulating connections endpoint – Handles Dripline messages listener_receiver – Asynchronously recieves AMQP messages and turns them into Dripline messages heartbeater – Sends periodic heartbeat messages scheduler – Can schedule events

As is apparent from the above descriptions, a service is responsible for a number of threads when it executes: Listening – grabs AMQP messages off the channel when they arrive Message-wait – any incomplete multi-part Dripline message will setup a thread to wait until the message is complete, and then submits it for handling Receiver – grabs completed Dripline messages and handles it Async endpoint listening – same as abovefor each asynchronous endpoint Async endpoint message-wait – same as above for each asynchronous endpoint Async endpoint receiver – same as above for each asynchronous endpoint Heatbeater – sends regular heartbeat messages Scheduler – executes scheduled events

In addition to receiving messages from the broker, a user or client code can give messages directly to the service using process_message(message).

Definition at line 79 of file service.hh.

Member Typedef Documentation

◆ async_map_t

typedef std::map< std::string, lr_ptr_t > async_map_t

Definition at line 212 of file service.hh.

◆ sync_map_t

typedef std::map< std::string, endpoint_ptr_t > sync_map_t

Definition at line 209 of file service.hh.

Member Enumeration Documentation

◆ status

enum status
strongprotected
Enumerator
nothing 
channel_created 
exchange_declared 
queue_declared 
queue_bound 
consuming 
listening 
processing 

Definition at line 87 of file service.hh.

Constructor & Destructor Documentation

◆ service() [1/3]

service ( const scarab::param_node &  a_config = service_config(),
const scarab::authentication &  a_auth = create_auth_with_dripline(true),
const bool  a_make_connection = true 
)

Definition at line 31 of file service.cc.

◆ service() [2/3]

service ( const service )
delete

◆ service() [3/3]

service ( service &&  a_orig)
default

◆ ~service()

~service ( )
virtual

Definition at line 77 of file service.cc.

Member Function Documentation

◆ add_async_child()

bool add_async_child ( endpoint_ptr_t  a_endpoint_ptr)

Add an asynchronous child endpoint.

Definition at line 122 of file service.cc.

◆ add_child()

bool add_child ( endpoint_ptr_t  a_endpoint_ptr)

Add a synchronous child endpoint.

Definition at line 107 of file service.cc.

◆ bind_keys()

bool bind_keys ( )
protectedvirtual

Definition at line 386 of file service.cc.

◆ do_cancellation()

void do_cancellation ( int  a_code)
privatevirtual

Definition at line 581 of file service.cc.

◆ listen()

bool listen ( )

Starts listening on the queue for receiving messages. If no queue was created, this does nothing. If this returns false, the service should quit with an error

Definition at line 230 of file service.cc.

◆ listen_on_queue()

bool listen_on_queue ( )
virtual

Waits for AMQP messages arriving on the channel Returns false if the return is due to an error in this function; returns true otherwise (namely because it was canceled)

Implements listener.

Definition at line 454 of file service.cc.

◆ on_request_message()

reply_ptr_t on_request_message ( const request_ptr_t  a_request)
protectedvirtual

Default request handler; passes request to initial request functions.

Reimplemented from endpoint.

Definition at line 556 of file service.cc.

◆ open_channels()

bool open_channels ( )
protectedvirtual

Definition at line 353 of file service.cc.

◆ operator=() [1/2]

service& operator= ( const service )
delete

◆ operator=() [2/2]

service & operator= ( service &&  a_orig)

Definition at line 87 of file service.cc.

◆ remove_queue()

bool remove_queue ( )
protectedvirtual

Definition at line 440 of file service.cc.

◆ run()

void run ( )
virtual

Runs the service, which consists of three stages:

  1. Starting the service – sets up the connection with the broker
  2. Listens for messages – waits on the queue to receive messages, and then handles them
  3. Stops the service – breaks down everything that was setup in start()

Override this to customize when happens when a service runs.

Definition at line 142 of file service.cc.

◆ send() [1/3]

sent_msg_pkg_ptr send ( alert_ptr_t  a_alert,
amqp_channel_ptr  a_channel = amqp_channel_ptr() 
) const
inlinevirtual

Sends an alert message.

Reimplemented from core.

Definition at line 243 of file service.hh.

◆ send() [2/3]

sent_msg_pkg_ptr send ( reply_ptr_t  a_reply,
amqp_channel_ptr  a_channel = amqp_channel_ptr() 
) const
inlinevirtual

Sends a reply message.

Reimplemented from core.

Definition at line 235 of file service.hh.

◆ send() [3/3]

sent_msg_pkg_ptr send ( request_ptr_t  a_request,
amqp_channel_ptr  a_channel = amqp_channel_ptr() 
) const
inlinevirtual

Sends a request message and returns a channel on which to listen for a reply.

Reimplemented from core.

Definition at line 227 of file service.hh.

◆ send_reply()

void send_reply ( reply_ptr_t  a_reply) const
virtual

Sends a reply message.

Reimplemented from endpoint.

Definition at line 542 of file service.cc.

◆ setup_queues()

bool setup_queues ( )
protectedvirtual

Definition at line 370 of file service.cc.

◆ snake_case_mv_accessible() [1/4]

snake_case_mv_accessible ( bool  ,
enable_scheduling   
)

◆ snake_case_mv_accessible() [2/4]

snake_case_mv_accessible ( bool  ,
restart_on_error   
)

◆ snake_case_mv_accessible() [3/4]

snake_case_mv_accessible ( status  ,
status   
)

◆ snake_case_mv_accessible() [4/4]

snake_case_mv_accessible ( uuid_t  ,
id   
)

◆ snake_case_mv_referrable() [1/4]

snake_case_mv_referrable ( async_map_t  ,
async_children   
)

◆ snake_case_mv_referrable() [2/4]

snake_case_mv_referrable ( scarab::authentication  ,
auth   
)

◆ snake_case_mv_referrable() [3/4]

snake_case_mv_referrable ( std::string  ,
broadcast_key   
)

◆ snake_case_mv_referrable() [4/4]

snake_case_mv_referrable ( sync_map_t  ,
sync_children   
)

◆ start()

bool start ( )

Creates a channel to the broker and establishes the queue for receiving messages. If no queue name was given, this does nothing. If this returns false, the service should quit with an error

Definition at line 192 of file service.cc.

◆ start_consuming()

bool start_consuming ( )
protectedvirtual

Definition at line 411 of file service.cc.

◆ stop()

bool stop ( )

Stops receiving messages and closes the connection to the broker. If no queue was created, this does nothing. If this returns false, the service should quit with an error

Definition at line 328 of file service.cc.

◆ stop_consuming()

bool stop_consuming ( )
protectedvirtual

Definition at line 426 of file service.cc.

◆ submit_message()

void submit_message ( message_ptr_t  a_message)
protectedvirtual

Implementation of submit_message (from concurrent_receiver)

Implements concurrent_receiver.

Definition at line 511 of file service.cc.


The documentation for this class was generated from the following files: