Dripline-Cpp  v2.10.11
Dripline Implementation in C++
endpoint.cc
Go to the documentation of this file.
1 /*
2  * endpoint.cc
3  *
4  * Created on: Aug 14, 2018
5  * Author: N.S. Oblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "endpoint.hh"
11 
12 #include "dripline_exceptions.hh"
13 #include "service.hh"
14 #include "throw_reply.hh"
15 
16 #include "logger.hh"
17 
18 #ifdef DL_PYTHON
19 #include "reply_cache.hh"
20 
21 #include "pybind11/pybind11.h"
22 #include "pybind11/pytypes.h"
23 #endif
24 
25 LOGGER( dlog, "endpoint" );
26 
27 namespace dripline
28 {
29 
30  endpoint::endpoint( const std::string& a_name ) :
31  f_name( a_name ),
32  f_service( nullptr ),
33  f_lockout_tag(),
34  f_lockout_key( generate_nil_uuid() )
35  {
36  }
37 
39  {
40  if( f_service == nullptr ) throw dripline_error() << "Parent service pointer for endpoint <" << f_name << "> is null";
41  return *f_service;
42  }
43 
44  const service& endpoint::parent() const
45  {
46  if( f_service == nullptr ) throw dripline_error() << "Parent service pointer for endpoint <" << f_name << "> is null";
47  return *f_service;
48  }
49 
51  {
52  return this->on_request_message( a_request_ptr );;
53  }
54 
56  {
57  return this->on_alert_message( a_alert_ptr );
58  }
59 
61  {
62  return this->on_reply_message( a_reply_ptr );
63  }
64 
66  {
67  // reply object to store whatever reply we end up with
68  reply_ptr_t t_reply;
69 
70  // lambda to send the reply. this local function is defined so we can send from within the catch block if needed before rethrowing.
71  auto t_replier = [&t_reply, &a_request, this](){
72  // send the reply if the request had a reply-to
73  if( a_request->reply_to().empty() )
74  {
75  LWARN( dlog, "Not sending reply (reply-to empty)\n" <<
76  " Return code: " << t_reply->get_return_code() << '\n' <<
77  " Return message: " << t_reply->return_message() << '\n' <<
78  " Payload:\n" << t_reply->payload() );
79  }
80  else
81  {
82  send_reply( t_reply );
83  }
84  };
85 
86  try
87  {
88  if( ! a_request->get_is_valid() )
89  {
90  std::string t_message( "Request message was not valid" );
91  // check in the payload for error information
92  if( a_request->payload().is_node() )
93  {
94  const scarab::param_node& t_payload = a_request->payload().as_node();
95  if( t_payload.has("error") ) t_message += "; " + t_payload["error"]().as_string();
96  }
97  throw throw_reply( dl_service_error_decoding_fail{}, a_request->get_payload_ptr()->clone() ) << "Request message was not valid";
98  }
99 
100  // the lockout key must be valid
101  if( ! a_request->get_lockout_key_valid() )
102  {
103  throw throw_reply( dl_service_error_invalid_key{} ) << "Lockout key could not be parsed";
104  }
105 
106  switch( a_request->get_message_operation() )
107  {
108  case op_t::get:
109  {
110  t_reply = __do_get_request( a_request );
111  break;
112  } // end "get" operation
113  case op_t::set:
114  {
115  t_reply = __do_set_request( a_request );
116  break;
117  } // end "set" operation
118  case op_t::cmd:
119  {
120  t_reply = __do_cmd_request( a_request );
121  break;
122  }
123  default:
124  throw throw_reply( dl_service_error_invalid_method() ) << "Unrecognized message operation: <" << a_request->get_message_operation() << ">";
125  break;
126  } // end switch on message type
127  // reply to be sent outside the try block
128  }
129  catch( const throw_reply& e )
130  {
131  if( e.ret_code().rc_value() == dl_success::s_value )
132  {
133  LINFO( dlog, "Replying with: " << e.return_message() );
134  }
135  else
136  {
137  LWARN( dlog, "Replying with: " << e.return_message() );
138  }
139  t_reply = a_request->reply( e.ret_code(), e.return_message() );
140  t_reply->set_payload( e.get_payload_ptr()->clone() );
141  // don't rethrow a throw_reply
142  // reply to be sent outside the catch block
143  }
144 #ifdef DL_PYTHON
145  catch( const pybind11::error_already_set& e )
146  {
147  // check whether the error message from python starts with the keyword
148  // the keyword should be the name of the python class
149  if( std::string(e.what()).substr(0, throw_reply::py_throw_reply_keyword().size()) == throw_reply::py_throw_reply_keyword() )
150  {
151  reply_cache* t_reply_cache = reply_cache::get_instance();
152  if( t_reply_cache->ret_code().rc_value() == dl_success::s_value )
153  {
154  LINFO( dlog, "Replying with: " << t_reply_cache->return_message() );
155  }
156  else
157  {
158  LWARN( dlog, "Replying with: " << t_reply_cache->return_message() );
159  }
160  t_reply = a_request->reply( t_reply_cache->ret_code(),t_reply_cache->return_message() );
161  t_reply->set_payload( t_reply_cache->get_payload_ptr()->clone() );
162  // don't rethrow a throw_reply
163  // reply to be sent outside the catch block
164  }
165  else
166  {
167  // treat the python exception as a standard exception
168  LERROR( dlog, "Caught exception from Python: " << e.what() );
169  t_reply = a_request->reply( dl_unhandled_exception(), e.what() );
170  t_replier(); // send the reply before rethrowing
171  throw; // unhandled exceptions should rethrow because they're by definition unhandled
172  }
173  }
174 #endif
175  catch( const std::exception& e )
176  {
177  LERROR( dlog, "Caught exception: " << e.what() );
178  t_reply = a_request->reply( dl_unhandled_exception(), e.what() );
179  t_replier(); // send the reply before rethrowing
180  throw; // unhandled exceptions should rethrow because they're by definition unhandled
181  }
182 
183  // send the reply
184  t_replier();
185 
186  return t_reply;
187  }
188 
190  {
191  if( a_message->is_request() )
192  {
193  on_request_message( std::static_pointer_cast< msg_request >( a_message ) );
194  }
195  else if( a_message->is_alert() )
196  {
197  on_alert_message( std::static_pointer_cast< msg_alert >( a_message ) );
198  }
199  else if( a_message->is_reply() )
200  {
201  on_reply_message( std::static_pointer_cast< msg_reply >( a_message ) );
202  }
203  else
204  {
205  throw dripline_error() << "Unknown message type";
206  }
207  }
208 
209  void endpoint::send_reply( reply_ptr_t a_reply ) const
210  {
211  if( ! f_service )
212  {
213  LWARN( dlog, "Cannot send reply because the service pointer is not set" );
214  return;
215  }
216 
217  LDEBUG( dlog, "Sending reply message to <" << a_reply->routing_key() << ">:\n" <<
218  " Return code: " << a_reply->get_return_code() << '\n' <<
219  " Return message: " << a_reply->return_message() << '\n' <<
220  " Payload:\n" << a_reply->payload() );
221 
222  sent_msg_pkg_ptr t_receive_reply;
223  try
224  {
225  t_receive_reply = f_service->send( a_reply );
226  }
227  catch( message_ptr_t )
228  {
229  LWARN( dlog, "Operating in offline mode; message not sent" );
230  return;
231  }
232  catch( connection_error& e )
233  {
234  LERROR( dlog, "Unable to connect to the broker:\n" << e.what() );
235  return;
236  }
237  catch( dripline_error& e )
238  {
239  LERROR( dlog, "Dripline error while sending reply:\n" << e.what() );
240  return;
241  }
242 
243  if( ! t_receive_reply->f_successful_send )
244  {
245  LERROR( dlog, "Failed to send reply:\n" + t_receive_reply->f_send_error_message );
246  return;
247  }
248 
249  return;
250  }
251 
253  {
254  throw dripline_error() << "Base endpoint does not handle reply messages";
255  }
256 
258  {
259  throw dripline_error() << "Base endpoint does not handle alert messages";
260  }
261 
263  {
264  LDEBUG( dlog, "Run operation request received" );
265 
266  if( ! authenticate( a_request->lockout_key() ) )
267  {
268  std::stringstream t_conv;
269  t_conv << a_request->lockout_key();
270  std::string t_message( "Request denied due to lockout (key used: " + t_conv.str() + ")" );
271  LINFO( dlog, t_message );
272  return a_request->reply( dl_service_error_access_denied(), t_message );
273  }
274 
275  return do_run_request( a_request );
276  }
277 
279  {
280  LDEBUG( dlog, "Get operation request received" );
281 
282  std::string t_query_type;
283  if( ! a_request->parsed_specifier().empty() )
284  {
285  t_query_type = a_request->parsed_specifier().front();
286  }
287 
288  if( t_query_type == "is-locked" )
289  {
290  a_request->parsed_specifier().pop_front();
291  return handle_is_locked_request( a_request );
292  }
293 
294  return do_get_request( a_request );
295  }
296 
298  {
299  LDEBUG( dlog, "Set request received" );
300 
301  if( ! authenticate( a_request->lockout_key() ) )
302  {
303  std::stringstream t_conv;
304  t_conv << a_request->lockout_key();
305  std::string t_message( "Request denied due to lockout (key used: " + t_conv.str() + ")" );
306  LINFO( dlog, t_message );
307  return a_request->reply( dl_service_error_access_denied(), t_message );
308  }
309 
310  return do_set_request( a_request );
311  }
312 
314  {
315  LDEBUG( dlog, "Cmd request received" );
316 
317  std::string t_instruction;
318  if( ! a_request->parsed_specifier().empty() )
319  {
320  t_instruction = a_request->parsed_specifier().front();
321  }
322 
323  //LWARN( mtlog, "uuid string: " << a_request->get_payload().get_value( "key", "") << ", uuid: " << uuid_from_string( a_request->get_payload().get_value( "key", "") ) );
324  // this condition includes the exception for the unlock instruction that allows us to force the unlock regardless of the key.
325  // disable_key() checks the lockout key if it's not forced, so it's okay that we bypass this call to authenticate() for the unlock instruction.
326  if( ! authenticate( a_request->lockout_key() ) && t_instruction != "unlock" && t_instruction != "ping" && t_instruction != "set_condition" )
327  {
328  std::stringstream t_conv;
329  t_conv << a_request->lockout_key();
330  std::string t_message( "Request denied due to lockout (key used: " + t_conv.str() + ")" );
331  LINFO( dlog, t_message );
332  return a_request->reply( dl_service_error_access_denied(), t_message );
333  }
334 
335  if( t_instruction == "lock" )
336  {
337  a_request->parsed_specifier().pop_front();
338  return handle_lock_request( a_request );
339  }
340  else if( t_instruction == "unlock" )
341  {
342  a_request->parsed_specifier().pop_front();
343  return handle_unlock_request( a_request );
344  }
345  else if( t_instruction == "ping" )
346  {
347  a_request->parsed_specifier().pop_front();
348  return handle_ping_request( a_request );
349  }
350  else if( t_instruction == "set_condition" )
351  {
352  a_request->parsed_specifier().pop_front();
353  return handle_set_condition_request( a_request );
354  }
355 
356  return do_cmd_request( a_request );
357  }
358 
359  uuid_t endpoint::enable_lockout( const scarab::param_node& a_tag, uuid_t a_key )
360  {
361  if( is_locked() ) return generate_nil_uuid();
362  if( a_key.is_nil() ) f_lockout_key = generate_random_uuid();
363  else f_lockout_key = a_key;
364  f_lockout_tag = a_tag;
365  return f_lockout_key;
366  }
367 
368  bool endpoint::disable_lockout( const uuid_t& a_key, bool a_force )
369  {
370  if( ! is_locked() ) return true;
371  if( ! a_force && a_key != f_lockout_key ) return false;
372  f_lockout_key = generate_nil_uuid();
373  f_lockout_tag.clear();
374  return true;
375  }
376 
377  bool endpoint::authenticate( const uuid_t& a_key ) const
378  {
379  LDEBUG( dlog, "Authenticating with key <" << a_key << ">" );
380  if( is_locked() ) return check_key( a_key );
381  return true;
382  }
383 
385  {
386  uuid_t t_new_key = enable_lockout( a_request->get_sender_info(), a_request->lockout_key() );
387  if( t_new_key.is_nil() )
388  {
389  return a_request->reply( dl_resource_error(), "Unable to lock server" );;
390  }
391 
392  scarab::param_ptr_t t_payload_ptr( new scarab::param_node() );
393  scarab::param_node& t_payload_node = t_payload_ptr->as_node();
394  t_payload_node.add( "key", string_from_uuid( t_new_key ) );
395  return a_request->reply( dl_success(), "Server is now locked", std::move(t_payload_ptr) );
396  }
397 
399  {
400  if( ! is_locked() )
401  {
402  return a_request->reply( dl_warning_no_action_taken(), "Already unlocked" );
403  }
404 
405  bool t_force = a_request->payload().get_value( "force", false );
406 
407  if( disable_lockout( a_request->lockout_key(), t_force ) )
408  {
409  return a_request->reply( dl_success(), "Server unlocked" );
410  }
411  return a_request->reply( dl_resource_error(), "Failed to unlock server" );;
412  }
413 
415  {
416  return this->__do_handle_set_condition_request( a_request );
417  }
418 
420  {
421  bool t_is_locked = is_locked();
422  scarab::param_ptr_t t_reply_payload( new scarab::param_node() );
423  scarab::param_node& t_reply_node = t_reply_payload->as_node();
424  t_reply_node.add( "is_locked", t_is_locked );
425  if( t_is_locked ) t_reply_node.add( "tag", f_lockout_tag );
426  return a_request->reply( dl_success(), "Checked lock status", std::move(t_reply_payload) );
427  }
428 
430  {
431  return a_request->reply( dl_success(), "Hello, " + a_request->sender_exe() );
432  }
433 
434 } /* namespace dripline */
Error indicating a problem with the connection to the broker.
Dripline-specific errors.
virtual reply_ptr_t on_request_message(const request_ptr_t a_request)
Default request handler; passes request to initial request functions.
Definition: endpoint.cc:65
bool disable_lockout(const uuid_t &a_key, bool a_force=false)
Definition: endpoint.cc:368
reply_ptr_t handle_lock_request(const request_ptr_t a_request)
Definition: endpoint.cc:384
reply_ptr_t handle_ping_request(const request_ptr_t a_request)
Definition: endpoint.cc:429
reply_ptr_t __do_get_request(const request_ptr_t a_request)
Definition: endpoint.cc:278
bool is_locked() const
Definition: endpoint.hh:240
virtual reply_ptr_t __do_handle_set_condition_request(const request_ptr_t a_request)
Default set-condition: no action taken; override for different behavior.
Definition: endpoint.hh:250
endpoint(const std::string &a_name)
Definition: endpoint.cc:30
virtual void send_reply(reply_ptr_t a_reply) const
Definition: endpoint.cc:209
reply_ptr_t handle_unlock_request(const request_ptr_t a_request)
Definition: endpoint.cc:398
virtual reply_ptr_t do_cmd_request(const request_ptr_t a_request)
Definition: endpoint.hh:230
virtual void on_alert_message(const alert_ptr_t a_alert)
Definition: endpoint.cc:257
reply_ptr_t __do_set_request(const request_ptr_t a_request)
Definition: endpoint.cc:297
reply_ptr_t handle_set_condition_request(const request_ptr_t a_request)
Definition: endpoint.cc:414
void sort_message(const message_ptr_t a_request)
Definition: endpoint.cc:189
service & parent()
Definition: endpoint.cc:38
bool authenticate(const uuid_t &a_key) const
Returns true if the server is unlocked or if it's locked and the key matches the lockout key; returns...
Definition: endpoint.cc:377
void submit_reply_message(const reply_ptr_t a_reply)
Directly submit a reply message to this endpoint.
Definition: endpoint.cc:60
virtual reply_ptr_t do_get_request(const request_ptr_t a_request)
Definition: endpoint.hh:220
reply_ptr_t __do_run_request(const request_ptr_t a_request)
Definition: endpoint.cc:262
bool check_key(const uuid_t &a_key) const
Definition: endpoint.hh:245
reply_ptr_t __do_cmd_request(const request_ptr_t a_request)
Definition: endpoint.cc:313
void submit_alert_message(const alert_ptr_t a_alert)
Directly submit an alert message to this endpoint.
Definition: endpoint.cc:55
uuid_t enable_lockout(const scarab::param_node &a_tag)
enable lockout with randomly-generated key
Definition: endpoint.hh:235
virtual reply_ptr_t do_run_request(const request_ptr_t a_request)
Definition: endpoint.hh:215
reply_ptr_t submit_request_message(const request_ptr_t a_request)
Directly submit a request message to this endpoint.
Definition: endpoint.cc:50
virtual void on_reply_message(const reply_ptr_t a_reply)
Definition: endpoint.cc:252
reply_ptr_t handle_is_locked_request(const request_ptr_t a_request)
Definition: endpoint.cc:419
virtual reply_ptr_t do_set_request(const request_ptr_t a_request)
Definition: endpoint.hh:225
A singleton throw_reply object used to transfer throw_reply information to C++ from other implementat...
Definition: reply_cache.hh:38
Primary unit of software that connects to a broker and typically provides an interface with an instru...
Definition: service.hh:85
Object that can be thrown while processing a request to send a reply.
Definition: throw_reply.hh:42
const return_code & ret_code() const noexcept
Definition: throw_reply.hh:111
const scarab::param_ptr_t & get_payload_ptr() const noexcept
Definition: throw_reply.hh:138
const std::string & return_message() const noexcept
Definition: throw_reply.hh:101
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("endpoint", __FILE_NAME__, __LINE__)
std::string string_from_uuid(const uuid_t &a_id)
Generates a string representation of the provided UUID.
Definition: uuid.cc:78
std::shared_ptr< msg_alert > alert_ptr_t
Definition: dripline_fwd.hh:25
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
Definition: dripline_fwd.hh:27
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
Definition: uuid.cc:19
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
std::shared_ptr< msg_request > request_ptr_t
Definition: dripline_fwd.hh:23
uuid_t generate_nil_uuid()
Generates a UUID containing all 0s.
Definition: uuid.cc:26
boost::uuids::uuid uuid_t
Universally-unique-identifier type containing 16 hexadecimal characters.
Definition: uuid.hh:26
static unsigned s_value
virtual unsigned rc_value() const =0