![]() |
Dripline-Cpp
v2.10.11
Dripline Implementation in C++
|
Primary unit of software that connects to a broker and typically provides an interface with an instrument or other software. More...
#include <service.hh>

Public Types | |
| typedef std::map< std::string, endpoint_ptr_t > | sync_map_t |
| typedef std::map< std::string, lr_ptr_t > | async_map_t |
Public Types inherited from core | |
| enum class | post_listen_status { unknown , message_received , timeout , soft_error , hard_error } |
Public Types inherited from scheduler< executor, clock > | |
| using | clock_t = clock |
| using | time_point_t = typename clock::time_point |
| using | duration_t = typename clock::duration |
| using | executable_t = std::function< void() > |
| typedef std::multimap< time_point_t, event > | events_map_t |
Public Member Functions | |
| service (const scarab::param_node &a_config=service_config(), const scarab::authentication &a_auth=create_auth_with_dripline(true), const bool a_make_connection=true) | |
| service (const service &)=delete | |
| service (service &&a_orig)=default | |
| virtual | ~service () |
| service & | operator= (const service &)=delete |
| service & | operator= (service &&a_orig) |
| snake_case_mv_referrable (scarab::authentication, auth) | |
| snake_case_mv_accessible (status, status) | |
| snake_case_mv_accessible (bool, restart_on_error) | |
| snake_case_mv_accessible (bool, enable_scheduling) | |
| bool | add_child (endpoint_ptr_t a_endpoint_ptr) |
| Add a synchronous child endpoint. More... | |
| bool | add_async_child (endpoint_ptr_t a_endpoint_ptr) |
| Add an asynchronous child endpoint. More... | |
| virtual sent_msg_pkg_ptr | send (request_ptr_t a_request, amqp_channel_ptr a_channel=amqp_channel_ptr()) const |
| Sends a request message and returns a channel on which to listen for a reply. More... | |
| virtual sent_msg_pkg_ptr | send (reply_ptr_t a_reply, amqp_channel_ptr a_channel=amqp_channel_ptr()) const |
| Sends a reply message. More... | |
| virtual sent_msg_pkg_ptr | send (alert_ptr_t a_alert, amqp_channel_ptr a_channel=amqp_channel_ptr()) const |
| Sends an alert message. More... | |
| virtual void | run () |
| bool | start () |
| bool | listen () |
| bool | stop () |
| virtual bool | listen_on_queue () |
| virtual void | send_reply (reply_ptr_t a_reply) const |
| Sends a reply message. More... | |
| snake_case_mv_accessible (uuid_t, id) | |
| snake_case_mv_referrable (sync_map_t, sync_children) | |
| snake_case_mv_referrable (async_map_t, async_children) | |
| snake_case_mv_referrable (std::string, broadcast_key) | |
Public Member Functions inherited from core | |
| core (const scarab::param_node &a_config=dripline_config(), const scarab::authentication &a_auth=scarab::authentication(), const bool a_make_connection=true) | |
| core (const core &a_orig)=default | |
| core (core &&a_orig)=default | |
| virtual | ~core ()=default |
| core & | operator= (const core &a_orig)=default |
| core & | operator= (core &&a_orig)=default |
| snake_case_mv_referrable (std::string, address) | |
| snake_case_mv_accessible (unsigned, port) | |
| snake_case_mv_referrable (std::string, username) | |
| snake_case_mv_referrable (std::string, password) | |
| snake_case_mv_referrable (std::string, requests_exchange) | |
| snake_case_mv_referrable (std::string, alerts_exchange) | |
| snake_case_mv_referrable (std::string, heartbeat_routing_key) | |
| snake_case_mv_accessible (unsigned, max_payload_size) | |
| snake_case_mv_accessible (bool, make_connection) | |
| snake_case_mv_accessible (unsigned, max_connection_attempts) | |
Public Member Functions inherited from endpoint | |
| endpoint (const std::string &a_name) | |
| endpoint (const endpoint &a_orig)=default | |
| endpoint (endpoint &&a_orig)=default | |
| virtual | ~endpoint ()=default |
| endpoint & | operator= (const endpoint &a_orig)=default |
| endpoint & | operator= (endpoint &&a_orig)=default |
| snake_case_mv_referrable (std::string, name) | |
| snake_case_mv_accessible (service *, service) | |
| service & | parent () |
| const service & | parent () const |
| reply_ptr_t | submit_request_message (const request_ptr_t a_request) |
| Directly submit a request message to this endpoint. More... | |
| void | submit_reply_message (const reply_ptr_t a_reply) |
| Directly submit a reply message to this endpoint. More... | |
| void | submit_alert_message (const alert_ptr_t a_alert) |
| Directly submit an alert message to this endpoint. More... | |
| virtual void | on_reply_message (const reply_ptr_t a_reply) |
| virtual void | on_alert_message (const alert_ptr_t a_alert) |
| virtual reply_ptr_t | do_run_request (const request_ptr_t a_request) |
| virtual reply_ptr_t | do_get_request (const request_ptr_t a_request) |
| virtual reply_ptr_t | do_set_request (const request_ptr_t a_request) |
| virtual reply_ptr_t | do_cmd_request (const request_ptr_t a_request) |
| void | sort_message (const message_ptr_t a_request) |
| uuid_t | enable_lockout (const scarab::param_node &a_tag) |
| enable lockout with randomly-generated key More... | |
| uuid_t | enable_lockout (const scarab::param_node &a_tag, uuid_t a_key) |
| enable lockout with user-supplied key More... | |
| bool | disable_lockout (const uuid_t &a_key, bool a_force=false) |
| bool | is_locked () const |
| bool | check_key (const uuid_t &a_key) const |
Public Member Functions inherited from listener_receiver | |
| listener_receiver () | |
| listener_receiver (const listener_receiver &)=delete | |
| listener_receiver (listener_receiver &&a_orig) | |
| listener_receiver & | operator= (const listener_receiver &)=delete |
| listener_receiver & | operator= (listener_receiver &&a_orig) |
Public Member Functions inherited from listener | |
| listener () | |
| listener (const listener &)=delete | |
| listener (listener &&a_orig)=default | |
| virtual | ~listener ()=default |
| listener & | operator= (const listener &)=delete |
| listener & | operator= (listener &&a_orig) |
| snake_case_mv_referrable (amqp_channel_ptr, channel) | |
| snake_case_mv_referrable (std::string, consumer_tag) | |
| snake_case_mv_accessible (unsigned, listen_timeout_ms) | |
| snake_case_mv_referrable (std::thread, listener_thread) | |
Public Member Functions inherited from concurrent_receiver | |
| concurrent_receiver () | |
| concurrent_receiver (const concurrent_receiver &)=delete | |
| concurrent_receiver (concurrent_receiver &&a_orig) | |
| virtual | ~concurrent_receiver () |
| concurrent_receiver & | operator= (const concurrent_receiver &)=delete |
| concurrent_receiver & | operator= (concurrent_receiver &&a_orig) |
| virtual void | process_message (message_ptr_t a_message) |
| Deposits the message in the concurrent queue (called by the listener) More... | |
| void | execute () |
Handles messages that appear in the concurrent queue by calling submit_message(). More... | |
Public Member Functions inherited from receiver | |
| receiver () | |
| receiver (const receiver &a_orig)=delete | |
| receiver (receiver &&a_orig)=default | |
| virtual | ~receiver ()=default |
| receiver & | operator= (const receiver &a_orig)=delete |
| receiver & | operator= (receiver &&a_orig) |
| void | handle_message_chunk (amqp_envelope_ptr a_envelope) |
| void | wait_for_message (incoming_message_pack &a_pack, const std::string &a_message_id) |
| void | process_message_pack (incoming_message_pack &a_pack, const std::string &a_message_id) |
| Converts a message pack into a Dripline message, and then submits the message for processing. More... | |
| snake_case_mv_referrable (incoming_message_map, incoming_messages) | |
| Stores the incomplete messages. More... | |
| snake_case_mv_accessible (unsigned, single_message_wait_ms) | |
| Wait time for all message chunks from a single dripline message. More... | |
| snake_case_mv_accessible (unsigned, reply_listen_timeout_ms) | |
| Listen timeout for individual message chunks when waiting for replies. More... | |
| reply_ptr_t | wait_for_reply (const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms=0) |
| reply_ptr_t | wait_for_reply (const sent_msg_pkg_ptr a_receive_reply, core::post_listen_status &a_status, int a_timeout_ms=0) |
Public Member Functions inherited from heartbeater | |
| heartbeater (service *a_service) | |
| Primary constructor. A service pointer is required to be able to send messages. More... | |
| heartbeater (const heartbeater &)=delete | |
| heartbeater (heartbeater &&a_orig)=default | |
| virtual | ~heartbeater ()=default |
| heartbeater & | operator= (const heartbeater &)=delete |
| heartbeater & | operator= (heartbeater &&a_orig) |
| void | execute (const std::string &a_name, uuid_t a_id, const std::string &a_routing_key) |
| snake_case_mv_accessible (unsigned, heartbeat_interval_s) | |
| Interval between heartbeat alerts (default: 60 s) More... | |
| snake_case_mv_accessible (unsigned, check_timeout_ms) | |
| Timing interval for the internal loop (default: 1000 ms) More... | |
| snake_case_mv_accessible (service *, service) | |
Public Member Functions inherited from scheduler< executor, clock > | |
| scheduler () | |
| scheduler (const scheduler &)=delete | |
| scheduler (scheduler &&) | |
| virtual | ~scheduler ()=default |
| scheduler & | operator= (const scheduler &)=delete |
| scheduler & | operator= (scheduler &&a_orig) |
| int | schedule (executable_t an_executable, time_point_t an_exe_time) |
| int | schedule (executable_t an_executable, duration_t an_interval, time_point_t an_exe_time=clock::now()) |
| void | unschedule (int an_id) |
| Unschedule an event using the event's ID. More... | |
| void | execute () |
| Main execution loop for the scheduler. More... | |
| snake_case_mv_accessible (duration_t, exe_buffer) | |
| The time difference from "now" that determines whether an event is executed. More... | |
| snake_case_mv_accessible (duration_t, cycle_time) | |
| Main thread cycle time. More... | |
| snake_case_mv_referrable_const (executor, the_executor) | |
| The executor used to execute events. More... | |
| snake_case_mv_referrable_const (events_map_t, events) | |
| The scheduled events, stored in a map sorted by execution time. More... | |
Protected Types | |
| enum class | status { nothing = 0 , channel_created = 10 , exchange_declared = 20 , queue_declared = 30 , queue_bound = 40 , consuming = 50 , listening = 60 , processing = 70 } |
Protected Member Functions | |
| virtual bool | open_channels () |
| virtual bool | setup_queues () |
| virtual bool | bind_keys () |
| virtual bool | start_consuming () |
| virtual bool | stop_consuming () |
| virtual bool | remove_queue () |
| virtual void | submit_message (message_ptr_t a_message) |
| Implementation of submit_message (from concurrent_receiver) More... | |
| virtual reply_ptr_t | on_request_message (const request_ptr_t a_request) |
| Default request handler; passes request to initial request functions. More... | |
Protected Member Functions inherited from core | |
| sent_msg_pkg_ptr | do_send (message_ptr_t a_message, const std::string &a_exchange, bool a_expect_reply, amqp_channel_ptr a_channel=amqp_channel_ptr()) const |
| amqp_channel_ptr | send_withreply (message_ptr_t a_message, std::string &a_reply_consumer_tag, const std::string &a_exchange) const |
| bool | send_noreply (message_ptr_t a_message, const std::string &a_exchange) const |
| amqp_channel_ptr | open_channel () const |
Protected Member Functions inherited from endpoint | |
| bool | authenticate (const uuid_t &a_key) const |
| Returns true if the server is unlocked or if it's locked and the key matches the lockout key; returns false otherwise. More... | |
| snake_case_mv_referrable (scarab::param_node, lockout_tag) | |
| snake_case_mv_accessible (uuid_t, lockout_key) | |
Protected Member Functions inherited from concurrent_receiver | |
| snake_case_mv_referrable (scarab::concurrent_queue< message_ptr_t >, message_queue) | |
| snake_case_mv_referrable (std::thread, receiver_thread) | |
Protected Member Functions inherited from receiver | |
| reply_ptr_t | process_received_reply (incoming_message_pack &a_pack, const std::string &a_message_id) |
Private Member Functions | |
| virtual void | do_cancellation (int a_code) |
Additional Inherited Members | |
Static Public Member Functions inherited from core | |
| static void | listen_for_message (amqp_envelope_ptr &a_envelope, post_listen_status &a_status, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true) |
| listen for a single AMQP message More... | |
Public Attributes inherited from scheduler< executor, clock > | |
| snake_case_mv_accessible_static(int, curr_id) protected std::recursive_mutex | f_scheduler_mutex |
| The ID to be used for the next scheduled event. More... | |
| std::mutex | f_executor_mutex |
| std::condition_variable_any | f_cv |
| std::thread | f_scheduler_thread |
Static Public Attributes inherited from core | |
| static bool | s_offline = false |
Static Protected Member Functions inherited from core | |
| static bool | setup_exchange (amqp_channel_ptr a_channel, const std::string &a_exchange) |
| static bool | setup_queue (amqp_channel_ptr a_channel, const std::string &a_queue_name) |
| static bool | bind_key (amqp_channel_ptr a_channel, const std::string &a_exchange, const std::string &a_queue_name, const std::string &a_routing_key) |
| static std::string | start_consuming (amqp_channel_ptr a_channel, const std::string &a_queue_name) |
| static bool | stop_consuming (amqp_channel_ptr a_channel, std::string &a_consumer_tag) |
| static bool | remove_queue (amqp_channel_ptr a_channel, const std::string &a_queue_name) |
Protected Attributes inherited from heartbeater | |
| std::thread | f_heartbeat_thread |
Primary unit of software that connects to a broker and typically provides an interface with an instrument or other software.
The service class is the implementation of the "service" concept in Dripline. It's the primary component that makes up a Dripline mesh.
The lifetime of a service is defined by the three main functions:
start() – create the AMQP channel, create the AMQP queue, bind the routing keys, and start consuming AMQP messageslisten() – starts the heartbeat and scheduler threads (optional), starts the receiver thread, and waits for and handles messages on the queuestop() – (called asynchronously) cancels the listening serviceThe ability to handle and respond to Dripline messages is embodied in the endpoint class.
Service uses endoint in three ways:
A service has a number of key characteristics (most of which come from its parent classes): core – Has all of the basic AMQP capabilities, sending messages, and making and manipulating connections endpoint – Handles Dripline messages listener_receiver – Asynchronously recieves AMQP messages and turns them into Dripline messages heartbeater – Sends periodic heartbeat messages scheduler – Can schedule events
As is apparent from the above descriptions, a service is responsible for a number of threads when it executes: Listening – grabs AMQP messages off the channel when they arrive Message-wait – any incomplete multi-part Dripline message will setup a thread to wait until the message is complete, and then submits it for handling Receiver – grabs completed Dripline messages and handles it Async endpoint listening – same as abovefor each asynchronous endpoint Async endpoint message-wait – same as above for each asynchronous endpoint Async endpoint receiver – same as above for each asynchronous endpoint Heatbeater – sends regular heartbeat messages Scheduler – executes scheduled events
In addition to receiving messages from the broker, a user or client code can give messages directly to the service using process_message(message).
Definition at line 79 of file service.hh.
| typedef std::map< std::string, lr_ptr_t > async_map_t |
Definition at line 212 of file service.hh.
| typedef std::map< std::string, endpoint_ptr_t > sync_map_t |
Definition at line 209 of file service.hh.
|
strongprotected |
| Enumerator | |
|---|---|
| nothing | |
| channel_created | |
| exchange_declared | |
| queue_declared | |
| queue_bound | |
| consuming | |
| listening | |
| processing | |
Definition at line 87 of file service.hh.
| service | ( | const scarab::param_node & | a_config = service_config(), |
| const scarab::authentication & | a_auth = create_auth_with_dripline(true), |
||
| const bool | a_make_connection = true |
||
| ) |
Definition at line 31 of file service.cc.
|
virtual |
Definition at line 77 of file service.cc.
| bool add_async_child | ( | endpoint_ptr_t | a_endpoint_ptr | ) |
Add an asynchronous child endpoint.
Definition at line 122 of file service.cc.
| bool add_child | ( | endpoint_ptr_t | a_endpoint_ptr | ) |
Add a synchronous child endpoint.
Definition at line 107 of file service.cc.
|
protectedvirtual |
Definition at line 386 of file service.cc.
|
privatevirtual |
Definition at line 581 of file service.cc.
| bool listen | ( | ) |
Starts listening on the queue for receiving messages. If no queue was created, this does nothing. If this returns false, the service should quit with an error
Definition at line 230 of file service.cc.
|
virtual |
Waits for AMQP messages arriving on the channel Returns false if the return is due to an error in this function; returns true otherwise (namely because it was canceled)
Implements listener.
Definition at line 454 of file service.cc.
|
protectedvirtual |
Default request handler; passes request to initial request functions.
Reimplemented from endpoint.
Definition at line 556 of file service.cc.
|
protectedvirtual |
Definition at line 353 of file service.cc.
Definition at line 87 of file service.cc.
|
protectedvirtual |
Definition at line 440 of file service.cc.
|
virtual |
Runs the service, which consists of three stages:
Override this to customize when happens when a service runs.
Definition at line 142 of file service.cc.
|
inlinevirtual |
|
inlinevirtual |
|
inlinevirtual |
Sends a request message and returns a channel on which to listen for a reply.
Reimplemented from core.
Definition at line 227 of file service.hh.
|
virtual |
|
protectedvirtual |
Definition at line 370 of file service.cc.
| snake_case_mv_accessible | ( | bool | , |
| enable_scheduling | |||
| ) |
| snake_case_mv_accessible | ( | bool | , |
| restart_on_error | |||
| ) |
| snake_case_mv_accessible | ( | uuid_t | , |
| id | |||
| ) |
| snake_case_mv_referrable | ( | async_map_t | , |
| async_children | |||
| ) |
| snake_case_mv_referrable | ( | scarab::authentication | , |
| auth | |||
| ) |
| snake_case_mv_referrable | ( | std::string | , |
| broadcast_key | |||
| ) |
| snake_case_mv_referrable | ( | sync_map_t | , |
| sync_children | |||
| ) |
| bool start | ( | ) |
Creates a channel to the broker and establishes the queue for receiving messages. If no queue name was given, this does nothing. If this returns false, the service should quit with an error
Definition at line 192 of file service.cc.
|
protectedvirtual |
Definition at line 411 of file service.cc.
| bool stop | ( | ) |
Stops receiving messages and closes the connection to the broker. If no queue was created, this does nothing. If this returns false, the service should quit with an error
Definition at line 328 of file service.cc.
|
protectedvirtual |
Definition at line 426 of file service.cc.
|
protectedvirtual |
Implementation of submit_message (from concurrent_receiver)
Implements concurrent_receiver.
Definition at line 511 of file service.cc.