Dripline-Cpp  v2.10.11
Dripline Implementation in C++
message.cc
Go to the documentation of this file.
1 /*
2  * message.cc
3  *
4  * Created on: Jul 9, 2015
5  * Author: nsoblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "message.hh"
11 
12 #include "dripline_constants.hh"
13 #include "dripline_exceptions.hh"
14 #include "dripline_version.hh"
15 #include "version_store.hh"
16 
17 #include "logger.hh"
18 #include "map_at_default.hh"
19 #include "param_json.hh"
20 #include "return_codes.hh"
21 #include "time.hh"
22 #include "version_wrapper.hh"
23 
24 #include <cmath>
25 #include <map>
26 
27 using std::shared_ptr;
28 using std::make_shared;
29 using std::string;
30 
31 using scarab::param;
32 using scarab::param_node;
33 using scarab::param_value;
34 using scarab::param_input_json;
35 using scarab::param_output_json;
36 using scarab::param_ptr_t;
37 
38 using std::string;
39 
40 namespace dripline
41 {
42 
43  LOGGER( dlog, "message" );
44 
45 
46  //***********
47  // Message
48  //***********
49 
51  f_version(),
52  f_commit(),
53  f_package()
54  {}
55 
56  message::sender_package_version::sender_package_version( const scarab::version_semantic& a_version ) :
57  f_version( a_version.version_str() ),
58  f_commit( a_version.commit() ),
59  f_package( a_version.package() )
60  {}
61 
62  message::sender_package_version::sender_package_version( const std::string& a_version, const std::string& a_commit, const std::string& a_package ) :
63  f_version( a_version ),
64  f_commit( a_commit ),
65  f_package( a_package )
66  {}
67 
69  {
70  return f_version == a_rhs.f_version &&
71  f_commit == a_rhs.f_commit &&
72  f_package == a_rhs.f_package;
73  }
74 
76  f_is_valid( true ),
77  f_routing_key(),
78  f_correlation_id(),
79  f_message_id(),
80  f_reply_to(),
81  f_encoding( encoding::json ),
82  f_timestamp(),
83  f_sender_exe( "N/A" ),
84  f_sender_hostname( "N/A" ),
85  f_sender_username( "N/A" ),
86  f_sender_versions(),
87  f_specifier(),
88  f_payload( new param() )
89  {
90  // set the sender_info correctly for the server software
91  scarab::version_wrapper* t_version = scarab::version_wrapper::get_instance();
92  f_sender_exe = t_version->exe_name();
93  f_sender_hostname = t_version->hostname();
94  f_sender_username = t_version->username();
95  f_sender_service_name = "unknown";
96 
97  auto t_versions = version_store::get_instance()->versions();
98  for( auto& i_version : t_versions )
99  {
100  f_sender_versions.emplace( std::make_pair( i_version.first, *i_version.second ) );
101  }
102  }
103 
104 /*
105  message_ptr_t message::process_envelope( amqp_envelope_ptr a_envelope )
106  {
107  if( ! a_envelope )
108  {
109  throw dripline_error() << "Empty envelope received";
110  }
111  return message::process_message( a_envelope->Message(), a_envelope->RoutingKey() );
112  }
113 */
114  std::tuple< std::string, unsigned, unsigned > message::parse_message_id( const string& a_message_id )
115  {
116  std::string::size_type t_first_separator = a_message_id.find_first_of( s_message_id_separator );
117  std::string::size_type t_last_separator = a_message_id.find_last_of( s_message_id_separator );
118  if( t_first_separator == a_message_id.npos || t_last_separator == a_message_id.npos )
119  {
120  throw dripline_error() << "Message ID is not formatted correctly\nShould be [UUID]/[chunk]/[total chunks]\nReceived: " << a_message_id;
121  }
122 
123  return std::make_tuple( a_message_id.substr(0, t_first_separator),
124  std::stoul(a_message_id.substr(t_first_separator + 1, t_last_separator - t_first_separator - 1)),
125  std::stoul(a_message_id.substr(t_last_separator + 1)) );
126  }
127 
128  message_ptr_t message::process_message( amqp_split_message_ptrs a_message_ptrs, const std::string& a_routing_key )
129  {
130  // find first non-empty message pointer
131  // get length of payload (to use for empty ones)
132  // get header content
133  // set bool for complete to true
134  // loop through messages to build full payload string, if one is incomplete, fill with hashes and set bool for complete to false
135  // if payload complete, parse payload
136 
137  if( a_message_ptrs.empty() )
138  {
139  throw dripline_error() << "No messages were provided for processing";
140  }
141 
142  // Find the first valid message, from which we'll get the payload chunk length
143  // Below we'll also use this for the header content
144  amqp_message_ptr t_first_valid_message;
145  for( unsigned i_message = 0; ! t_first_valid_message && i_message < a_message_ptrs.size(); ++i_message )
146  {
147  t_first_valid_message = a_message_ptrs[i_message];
148  }
149 
150  if( ! t_first_valid_message )
151  {
152  throw dripline_error() << "All messages provided for processing were invalid";
153  }
154 
155  unsigned t_payload_chunk_length = t_first_valid_message->Body().size();
156 
157  encoding t_encoding;
158  if( t_first_valid_message->ContentEncoding() == "application/json" )
159  {
160  t_encoding = encoding::json;
161  }
162  else
163  {
164  throw dripline_error() << "Unable to parse message with content type <" << t_first_valid_message->ContentEncoding() << ">";
165  }
166 
167  // Build up the body
168  string t_payload_str;
169  bool t_payload_is_complete = true;
170  for( amqp_message_ptr& t_message : a_message_ptrs )
171  {
172  if( ! t_message )
173  {
174  // If a chunk of the message is missing, it's filled with hashes
175  t_payload_is_complete = false;
176  t_payload_str += string( t_payload_chunk_length, '#' );
177  continue;
178  }
179 
180  t_payload_str += t_message->Body();
181  }
182 
183  // Attempt to parse
184  scarab::param_ptr_t t_payload;
185  string t_payload_error_msg;
186  if( t_payload_is_complete )
187  {
188  // Parse the body
189  param_input_json t_input;
190  t_payload = t_input.read_string( t_payload_str );
191  if( ! t_payload )
192  {
193  t_payload_error_msg = "Message body could not be parsed; skipping request";
194  }
195  }
196  else
197  {
198  t_payload_error_msg = "Entire message was not available";
199  }
200 
201  // Payload is unavailable if the error message is non-empty
202  // In that case, store whatever we have for the payload string in the payload, plus the error message
203  if( ! t_payload_error_msg.empty() )
204  {
205  // Store the invalid payload string in the payload
206  t_payload = std::unique_ptr< scarab::param_node >( new param_node() );
207  t_payload->as_node().add( "invalid", t_payload_str );
208  t_payload->as_node().add( "error", t_payload_error_msg );
209  }
210 
211  LDEBUG( dlog, "Processing message:\n" <<
212  "Routing key: " << a_routing_key << '\n' <<
213  "Payload: " << t_payload_str );
214 
215  using scarab::at;
216 
217  using AmqpClient::Table;
218  using AmqpClient::TableEntry;
219  using AmqpClient::TableValue;
220  Table t_properties = t_first_valid_message->HeaderTable();
221 
222  // Create the message, of whichever type
223  message_ptr_t t_message;
224  msg_t t_msg_type = to_msg_t( at( t_properties, std::string("message_type"), TableValue(to_uint(msg_t::unknown)) ).GetInteger() );
225  switch( t_msg_type )
226  {
227  case msg_t::request:
228  {
230  std::move(t_payload),
231  to_op_t( at( t_properties, std::string("message_operation"), TableValue(to_uint(op_t::unknown)) ).GetInteger() ),
232  a_routing_key,
233  at( t_properties, std::string("specifier"), TableValue("") ).GetString(),
234  t_first_valid_message->ReplyTo(),
235  t_encoding);
236 
237  bool t_lockout_key_valid = true;
238  t_request->lockout_key() = uuid_from_string( at( t_properties, std::string("lockout_key"), TableValue("") ).GetString(), t_lockout_key_valid );
239  t_request->set_lockout_key_valid( t_lockout_key_valid );
240 
241  t_message = t_request;
242  break;
243  }
244  case msg_t::reply:
245  {
246  reply_ptr_t t_reply = msg_reply::create(
247  at( t_properties, std::string("return_code"), TableValue(999U) ).GetInteger(),
248  at( t_properties, std::string("return_message"), TableValue("") ).GetString(),
249  std::move(t_payload),
250  a_routing_key,
251  at( t_properties, std::string("specifier"), TableValue("") ).GetString(),
252  t_encoding);
253 
254  t_message = t_reply;
255  break;
256  }
257  case msg_t::alert:
258  {
259  alert_ptr_t t_alert = msg_alert::create(
260  std::move(t_payload),
261  a_routing_key,
262  at( t_properties, std::string("specifier"), TableValue("") ).GetString(),
263  t_encoding);
264 
265  t_message = t_alert;
266  break;
267  }
268  default:
269  {
270  throw dripline_error() << "Message received with unhandled type: " << t_msg_type;
271  break;
272  }
273  }
274 
275  // Set message header fields
276  if( ! t_payload_error_msg.empty() )
277  {
278  t_message->set_is_valid( false );
279  }
280 
281  t_message->correlation_id() = t_first_valid_message->CorrelationId();
282  t_message->message_id() = t_first_valid_message->MessageId();
283  // remove the message chunk information from the message id
284  t_message->message_id() = t_message->message_id().substr( 0, t_message->message_id().find_first_of(s_message_id_separator) );
285  t_message->timestamp() = at( t_properties, std::string("timestamp"), TableValue("") ).GetString();
286 
287  Table t_sender_info = at( t_properties, std::string("sender_info"), TableValue(Table()) ).GetTable();
288  scarab::param_ptr_t t_sender_info_param = table_to_param( t_sender_info );
289  t_message->set_sender_info( t_sender_info_param->as_node() );
290 
291  t_message->payload() = *t_payload;
292 
293  return t_message;
294  }
295 
297  {
298  f_timestamp = scarab::get_formatted_now();
299 
300  try
301  {
302  std::vector< string > t_body_parts;
303  encode_message_body( t_body_parts, a_max_size );
304 
305  unsigned t_n_chunks = t_body_parts.size();
306  std::vector< amqp_message_ptr > t_message_parts( t_n_chunks );
307 
308  if( f_message_id.empty() )
309  {
310  f_message_id = string_from_uuid( generate_random_uuid() );
311  }
312  string t_base_message_id = f_message_id + s_message_id_separator;
313  string t_total_chunks_str = s_message_id_separator + std::to_string(t_n_chunks);
314 
315  unsigned i_chunk = 0;
316  for( string& t_body_part : t_body_parts )
317  {
318  amqp_message_ptr t_message = AmqpClient::BasicMessage::Create( t_body_part );
319 
320  t_message->ContentEncoding( interpret_encoding() );
321  t_message->CorrelationId( f_correlation_id );
322  t_message->MessageId( t_base_message_id + std::to_string(i_chunk) + t_total_chunks_str );
323  t_message->ReplyTo( f_reply_to );
324 
325  AmqpClient::Table t_properties;
326  t_properties.insert( AmqpClient::TableEntry( "message_type", to_uint(message_type()) ) );
327  t_properties.insert( AmqpClient::TableEntry( "specifier", f_specifier.to_string() ) );
328  t_properties.insert( AmqpClient::TableEntry( "timestamp", f_timestamp ) );
329  t_properties.insert( AmqpClient::TableEntry( "sender_info", param_to_table( get_sender_info() ) ) );
330 
331  this->derived_modify_amqp_message( t_message, t_properties );
332 
333  t_message->HeaderTable( t_properties );
334 
335  t_message_parts[i_chunk] = t_message;
336 
337  ++i_chunk;
338  }
339 
340  return t_message_parts;
341  }
342  catch( dripline_error& e )
343  {
344  LERROR( dlog, e.what() );
345  return std::vector< amqp_message_ptr >();
346  }
347  }
348 
349  void message::encode_message_body( std::vector< string >& a_body_vec, unsigned a_max_size, const scarab::param_node& a_options ) const
350  {
351  switch( f_encoding )
352  {
353  case encoding::json:
354  {
355  string t_body;
356  param_output_json t_output;
357  if( ! t_output.write_string( *f_payload, t_body, a_options ) )
358  {
359  throw dripline_error() << "Could not convert message body to string";
360  }
361 
362  unsigned t_chars_per_chunk = a_max_size / sizeof(string::value_type);
363  unsigned t_n_chunks = std::ceil( double(t_body.size()) / double(t_chars_per_chunk) );
364  a_body_vec.resize( t_n_chunks );
365  for( unsigned i_chunk = 0, pos = 0; pos < t_body.size(); pos += t_chars_per_chunk, ++i_chunk )
366  {
367  a_body_vec[i_chunk] = t_body.substr(pos, t_chars_per_chunk );
368  }
369  break;
370  }
371  default:
372  throw dripline_error() << "Cannot encode using <" << interpret_encoding() << "> (" << f_encoding << ")";
373  break;
374  }
375  return;
376  }
377 
378  std::string message::encode_full_message( unsigned a_max_size, const scarab::param_node& a_options ) const
379  {
380  switch( f_encoding )
381  {
382  case encoding::json:
383  {
384  param_node t_message_node = get_message_param();
385  param_output_json t_output;
386  string t_message_string;
387  if( ! t_output.write_string( t_message_node, t_message_string, a_options ) )
388  {
389  throw dripline_error() << "Could not convert message to string";
390  }
391  if( t_message_string.size() > a_max_size ) t_message_string.resize( a_max_size );
392  return t_message_string;
393  break;
394  }
395  default:
396  throw dripline_error() << "Cannot encode using <" << interpret_encoding() << ">(" << f_encoding << ")";
397  break;
398  }
399  }
400 
402  {
403  switch( f_encoding )
404  {
405  case encoding::json:
406  return string( "application/json" );
407  break;
408  default:
409  return string( "Unknown" );
410  }
411  }
412 
413  param_node message::get_sender_info() const
414  {
415  param_node t_sender_info;
416  t_sender_info.add( "exe", f_sender_exe );
417  param_node t_versions;
418  for( auto& i_version : f_sender_versions )
419  {
420  param_node t_version_info;
421  t_version_info.add( "version", i_version.second.f_version );
422  if( ! i_version.second.f_commit.empty() ) t_version_info.add( "commit", i_version.second.f_commit );
423  if( ! i_version.second.f_package.empty() ) t_version_info.add( "package", i_version.second.f_package );
424  t_versions.add( i_version.first, std::move(t_version_info) );
425  }
426  t_sender_info.add( "versions", std::move(t_versions) );
427  t_sender_info.add( "hostname", f_sender_hostname );
428  t_sender_info.add( "username", f_sender_username );
429  t_sender_info.add( "service_name", f_sender_service_name );
430  return t_sender_info;
431  }
432 
433  void message::set_sender_info( const param_node& a_sender_info )
434  {
435  f_sender_exe = a_sender_info["exe"]().as_string();
436  const param_node& t_versions = a_sender_info["versions"].as_node();
437  f_sender_versions.clear();
438  for( auto i_version = t_versions.begin(); i_version != t_versions.end(); ++i_version )
439  {
440  f_sender_versions.insert( std::make_pair(i_version.name(), sender_package_version(
441  (*i_version)["version"]().as_string(),
442  i_version->get_value("commit", ""),
443  i_version->get_value("package", "") ) ) );
444  }
445  f_sender_hostname = a_sender_info["hostname"]().as_string();
446  f_sender_username = a_sender_info["username"]().as_string();
447  f_sender_service_name = a_sender_info["service_name"]().as_string();
448  return;
449  }
450 
451  param_node message::get_message_param( bool a_include_payload ) const
452  {
453  param_node t_message_node;
454  t_message_node.add( "routing_key", f_routing_key );
455  t_message_node.add( "specifier", f_specifier.unparsed() );
456  t_message_node.add( "correlation_id", f_correlation_id );
457  t_message_node.add( "message_id", f_message_id );
458  t_message_node.add( "reply_to", f_reply_to );
459  t_message_node.add( "message_type", to_uint(message_type()) );
460  t_message_node.add( "encoding", interpret_encoding() );
461  t_message_node.add( "timestamp", f_timestamp );
462  t_message_node.add( "sender_info", get_sender_info() );
463  if( a_include_payload ) t_message_node.add( "payload", payload().clone() );
464  this->derived_modify_message_param( t_message_node );
465  return t_message_node;
466  }
467 
468 
469  //***********
470  // Request
471  //***********
472 
474  message(),
475  f_lockout_key( generate_nil_uuid() ),
476  f_lockout_key_valid( true ),
477  f_message_operation( op_t::unknown )
478  {
479  f_correlation_id = string_from_uuid( generate_random_uuid() );
480  }
481 
482  request_ptr_t msg_request::create( param_ptr_t a_payload, op_t a_msg_op, const std::string& a_routing_key, const std::string& a_specifier, const std::string& a_reply_to, message::encoding a_encoding )
483  {
484  request_ptr_t t_request = make_shared< msg_request >();
485  t_request->set_payload( std::move(a_payload) );
486  t_request->set_message_operation( a_msg_op );
487  t_request->routing_key() = a_routing_key;
488  t_request->parsed_specifier() = a_specifier;
489  t_request->reply_to() = a_reply_to;
490  t_request->set_encoding( a_encoding );
491  return t_request;
492  }
493 
494  msg_t msg_request::s_message_type = msg_t::request;
495 
497  {
498  return msg_request::s_message_type;
499  }
500 
501 
502  //*********
503  // Reply
504  //*********
505 
507  message(),
508  f_return_code( dl_success::s_value ),
509  f_return_message(),
510  f_return_buffer()
511  {
512  }
513 
514  reply_ptr_t msg_reply::create( const return_code& a_return_code, const std::string& a_ret_msg, param_ptr_t a_payload, const std::string& a_routing_key, const std::string& a_specifier, message::encoding a_encoding )
515  {
516  return msg_reply::create( a_return_code.rc_value(), a_ret_msg, std::move(a_payload), a_routing_key, a_specifier, a_encoding );
517  }
518 
519  reply_ptr_t msg_reply::create( unsigned a_return_code_value, const std::string& a_ret_msg, param_ptr_t a_payload, const std::string& a_routing_key, const std::string& a_specifier, message::encoding a_encoding )
520  {
521  reply_ptr_t t_reply = make_shared< msg_reply >();
522  t_reply->set_return_code( a_return_code_value );
523  t_reply->return_message() = a_ret_msg;
524  t_reply->set_payload( std::move(a_payload) );
525  t_reply->routing_key() = a_routing_key;
526  t_reply->parsed_specifier() = a_specifier;
527  t_reply->set_encoding( a_encoding );
528  return t_reply;
529  }
530 
531  msg_t msg_reply::s_message_type = msg_t::reply;
532 
534  {
535  return msg_reply::s_message_type;
536  }
537 
538 
539  //*********
540  // Alert
541  //*********
542 
543  alert_ptr_t msg_alert::create( param_ptr_t a_payload, const std::string& a_routing_key, const std::string& a_specifier, message::encoding a_encoding )
544  {
545  alert_ptr_t t_alert = make_shared< msg_alert >();
546  t_alert->set_payload( std::move(a_payload) );
547  t_alert->routing_key() = a_routing_key;
548  t_alert->parsed_specifier() = a_specifier;
549  t_alert->set_encoding( a_encoding );
550  return t_alert;
551  }
552 
554  message()
555  {
556  f_correlation_id = string_from_uuid( generate_random_uuid() );
557  }
558 
559  msg_t msg_alert::s_message_type = msg_t::alert;
560 
562  {
563  return msg_alert::s_message_type;
564  }
565 
566 
567 
568  DRIPLINE_API bool operator==( const message& a_lhs, const message& a_rhs )
569  {
570  if( a_lhs.sender_versions().size() != a_rhs.sender_versions().size() ) return false;
571  bool t_versions_are_equal = true;
572  for( auto i_version = std::make_pair(a_lhs.sender_versions().begin(), a_rhs.sender_versions().begin());
573  i_version.first != a_lhs.sender_versions().end();
574  ++i_version.first,
575  ++i_version.second
576  )
577  {
578  t_versions_are_equal = t_versions_are_equal && i_version.first->second == i_version.second->second;
579  }
580 
581  return a_lhs.routing_key() == a_rhs.routing_key() &&
582  a_lhs.correlation_id() == a_rhs.correlation_id() &&
583  a_lhs.reply_to() == a_rhs.reply_to() &&
584  a_lhs.get_encoding() == a_rhs.get_encoding() &&
585  a_lhs.timestamp() == a_rhs.timestamp() &&
586  t_versions_are_equal &&
587  a_lhs.sender_exe() == a_rhs.sender_exe() &&
588  a_lhs.sender_hostname() == a_rhs.sender_hostname() &&
589  a_lhs.sender_username() == a_rhs.sender_username() &&
590  a_lhs.sender_service_name() == a_rhs.sender_service_name() &&
591  a_lhs.parsed_specifier() == a_rhs.parsed_specifier() &&
592  a_lhs.payload().to_string() == a_rhs.payload().to_string();
593  }
594 
595  DRIPLINE_API bool operator==( const msg_request& a_lhs, const msg_request& a_rhs )
596  {
597  return operator==( static_cast< const message& >(a_lhs), static_cast< const message& >(a_rhs) ) &&
598  a_lhs.lockout_key() == a_rhs.lockout_key() &&
599  a_lhs.get_lockout_key_valid() == a_rhs.get_lockout_key_valid() &&
600  a_lhs.get_message_operation() == a_rhs.get_message_operation();
601  }
602 
603  DRIPLINE_API bool operator==( const msg_reply& a_lhs, const msg_reply& a_rhs )
604  {
605  return operator==( static_cast< const message& >(a_lhs), static_cast< const message& >(a_rhs) ) &&
606  a_lhs.get_return_code() == a_rhs.get_return_code() &&
607  a_lhs.return_message() == a_rhs.return_message();
608  }
609 
610  DRIPLINE_API bool operator==( const msg_alert& a_lhs, const msg_alert& a_rhs )
611  {
612  return operator==( static_cast< const message& >(a_lhs), static_cast< const message& >(a_rhs) );
613  }
614 
615  DRIPLINE_API std::ostream& operator<<( std::ostream& a_os, message::encoding a_enc )
616  {
617  static std::map< message::encoding, string > s_enc_strings = { { message::encoding::json, "json" } };
618  return a_os << s_enc_strings[ a_enc ];
619  }
620 
621  DRIPLINE_API std::ostream& operator<<( std::ostream& a_os, const message& a_message )
622  {
623  a_os << "Routing key: " << a_message.routing_key() << '\n';
624  a_os << "Correlation ID: " << a_message.correlation_id() << '\n';
625  a_os << "Reply To: " << a_message.reply_to() << '\n';
626  a_os << "Message Type: " << to_uint(a_message.message_type()) << " (" << to_string(a_message.message_type()) << ")\n";
627  a_os << "Encoding: " << a_message.get_encoding() << '\n';
628  a_os << "Timestamp: " << a_message.timestamp() << '\n';
629  a_os << "Sender Info:\n";
630  a_os << "\tExecutable: " << a_message.sender_exe() << '\n';
631  a_os << "\tHostname: " << a_message.sender_hostname() << '\n';
632  a_os << "\tUsername: " << a_message.sender_username() << '\n';
633  a_os << "\tService: " << a_message.sender_service_name() << '\n';
634  a_os << "\tVersions:\n";
635  for( const auto& i_version : a_message.sender_versions() )
636  {
637  a_os << "\t\t" << i_version.first << ":\n";
638  a_os << "\t\t\tVersion: " << i_version.second.f_version << '\n';
639  a_os << "\t\t\tCommit: " << i_version.second.f_commit << '\n';
640  a_os << "\t\t\tPackage: " << i_version.second.f_package << '\n';
641  }
642  a_os << "Specifier: " << a_message.parsed_specifier().unparsed() << '\n';
643  if( a_message.payload().is_node() ) a_os << "Payload: " << a_message.payload().as_node() << '\n';
644  else if( a_message.payload().is_array() ) a_os << "Payload: " << a_message.payload().as_array() << '\n';
645  else if( a_message.payload().is_value() ) a_os << "Payload: " << a_message.payload().as_value() << '\n';
646  else a_os << "Payload: null\n";
647  return a_os;
648  }
649 
650  DRIPLINE_API std::ostream& operator<<( std::ostream& a_os, const msg_request& a_message )
651  {
652  a_os << static_cast< const message& >( a_message );
653  a_os << "Lockout Key: " << a_message.lockout_key() << '\n';
654  a_os << "Lockout Key Valid: " << a_message.get_lockout_key_valid() << '\n';
655  a_os << "Message Operation: " << a_message.get_message_operation() << '\n';
656  return a_os;
657  }
658 
659  DRIPLINE_API std::ostream& operator<<( std::ostream& a_os, const msg_reply& a_message )
660  {
661  a_os << static_cast< const message& >( a_message );
662  a_os << "Return Code: " << a_message.get_return_code() << '\n';
663  a_os << "Return Message: " << a_message.return_message() << '\n';
664  return a_os;
665  }
666 
667  DRIPLINE_API std::ostream& operator<<( std::ostream& a_os, const msg_alert& a_message )
668  {
669  a_os << static_cast< const message& >( a_message );
670  return a_os;
671  }
672 
673 } /* namespace dripline */
Dripline-specific errors.
Contains all of the information common to all types of Dripline messages.
Definition: message.hh:54
specifier f_specifier
Definition: message.hh:127
static message_ptr_t process_message(amqp_split_message_ptrs a_message_ptrs, const std::string &a_routing_key)
Converts a set of AMQP messages to a Dripline message object.
Definition: message.cc:128
virtual void derived_modify_amqp_message(amqp_message_ptr a_amqp_msg, AmqpClient::Table &a_properties) const =0
std::string interpret_encoding() const
Definition: message.cc:401
scarab::param_ptr_t f_payload
Definition: message.hh:151
scarab::param_node get_sender_info() const
Creates and returns a new param_node object to contain the sender info.
Definition: message.cc:413
specifier & parsed_specifier()
Definition: message.hh:336
std::string encode_full_message(unsigned a_max_size, const scarab::param_node &a_options=scarab::param_node()) const
Converts the entire message into a single string (default encoding is JSON)
Definition: message.cc:378
virtual void derived_modify_message_param(scarab::param_node &a_message_node) const =0
amqp_split_message_ptrs create_amqp_messages(unsigned a_max_size=10000)
Converts a Dripline message object to a set of AMQP messages.
Definition: message.cc:296
scarab::param & payload()
Definition: message.hh:346
void set_sender_info(const scarab::param_node &a_sender_info)
Copies the sender info out of a param_node.
Definition: message.cc:433
static std::tuple< std::string, unsigned, unsigned > parse_message_id(const std::string &a_message_id)
Parses the message ID, which should be of the form [UUID]/[chunk]/[total chunks].
Definition: message.cc:114
virtual msg_t message_type() const =0
static const char s_message_id_separator
Definition: message.hh:154
scarab::param_node get_message_param(bool a_include_payload=true) const
Creates and returns a new param_node object to contain the full message.
Definition: message.cc:451
void encode_message_body(std::vector< std::string > &a_body_vec, unsigned a_max_size, const scarab::param_node &a_options=scarab::param_node()) const
Converts the message-body to a strings (default encoding is JSON) for creating AMQP messages.
Definition: message.cc:349
Alert message class.
Definition: message.hh:300
virtual msg_t message_type() const
Definition: message.cc:561
static alert_ptr_t create(scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Creates an alert message.
Definition: message.cc:543
Reply message class.
Definition: message.hh:240
virtual msg_t message_type() const
Definition: message.cc:533
static reply_ptr_t create(const return_code &a_return_code, const std::string &a_ret_msg, scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Create a reply message using a return_code object and manually specifying the destination.
Request message class.
Definition: message.hh:180
virtual msg_t message_type() const
Definition: message.cc:496
static request_ptr_t create(scarab::param_ptr_t a_payload, op_t a_msg_op, const std::string &a_routing_key, const std::string &a_specifier="", const std::string &a_reply_to="", message::encoding a_encoding=encoding::json)
Create a request message.
Definition: message.cc:482
std::string to_string() const
Converts specifier tokens into a single string.
Definition: specifier.cc:108
#define DRIPLINE_API
Definition: dripline_api.hh:34
AmqpClient::TableValue param_to_table(const scarab::param_node &a_node)
Definition: amqp.cc:91
std::vector< amqp_message_ptr > amqp_split_message_ptrs
Definition: amqp.hh:31
std::string string_from_uuid(const uuid_t &a_id)
Generates a string representation of the provided UUID.
Definition: uuid.cc:78
std::shared_ptr< msg_alert > alert_ptr_t
Definition: dripline_fwd.hh:25
std::ostream & operator<<(std::ostream &a_os, op_t an_op)
Pass the integer-equivalent of a message-operation enum to an ostream.
std::string to_string(op_t an_op)
Gives the human-readable version of a message operation.
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
uuid_t uuid_from_string(const std::string &a_id_str)
Definition: uuid.cc:31
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
unsigned to_uint(op_t an_op)
Convert a message-operation enum to an integer.
scarab::param_ptr_t table_to_param(const AmqpClient::Table &a_table)
Definition: amqp.cc:14
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
Definition: uuid.cc:19
op_t to_op_t(unsigned an_op_uint)
AmqpClient::BasicMessage::ptr_t amqp_message_ptr
Definition: amqp.hh:26
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
std::shared_ptr< msg_request > request_ptr_t
Definition: dripline_fwd.hh:23
uuid_t generate_nil_uuid()
Generates a UUID containing all 0s.
Definition: uuid.cc:26
msg_t to_msg_t(unsigned a_msg_uint)
bool operator==(const message &a_lhs, const message &a_rhs)
Definition: message.cc:568
bool operator==(const sender_package_version &a_rhs) const
Definition: message.cc:68
Base class for return codes.
Definition: return_codes.hh:40
virtual unsigned rc_value() const =0