8 #define DRIPLINE_API_EXPORTS
18 #include "map_at_default.hh"
19 #include "param_json.hh"
22 #include "version_wrapper.hh"
27 using std::shared_ptr;
28 using std::make_shared;
32 using scarab::param_node;
33 using scarab::param_value;
34 using scarab::param_input_json;
35 using scarab::param_output_json;
36 using scarab::param_ptr_t;
57 f_version( a_version.version_str() ),
58 f_commit( a_version.commit() ),
59 f_package( a_version.package() )
63 f_version( a_version ),
65 f_package( a_package )
83 f_sender_exe(
"N/A" ),
84 f_sender_hostname(
"N/A" ),
85 f_sender_username(
"N/A" ),
91 scarab::version_wrapper* t_version = scarab::version_wrapper::get_instance();
92 f_sender_exe = t_version->exe_name();
93 f_sender_hostname = t_version->hostname();
94 f_sender_username = t_version->username();
95 f_sender_service_name =
"unknown";
97 auto t_versions = version_store::get_instance()->versions();
98 for(
auto& i_version : t_versions )
100 f_sender_versions.emplace( std::make_pair( i_version.first, *i_version.second ) );
118 if( t_first_separator == a_message_id.npos || t_last_separator == a_message_id.npos )
120 throw dripline_error() <<
"Message ID is not formatted correctly\nShould be [UUID]/[chunk]/[total chunks]\nReceived: " << a_message_id;
123 return std::make_tuple( a_message_id.substr(0, t_first_separator),
124 std::stoul(a_message_id.substr(t_first_separator + 1, t_last_separator - t_first_separator - 1)),
125 std::stoul(a_message_id.substr(t_last_separator + 1)) );
137 if( a_message_ptrs.empty() )
139 throw dripline_error() <<
"No messages were provided for processing";
145 for(
unsigned i_message = 0; ! t_first_valid_message && i_message < a_message_ptrs.size(); ++i_message )
147 t_first_valid_message = a_message_ptrs[i_message];
150 if( ! t_first_valid_message )
152 throw dripline_error() <<
"All messages provided for processing were invalid";
155 unsigned t_payload_chunk_length = t_first_valid_message->Body().size();
158 if( t_first_valid_message->ContentEncoding() ==
"application/json" )
164 throw dripline_error() <<
"Unable to parse message with content type <" << t_first_valid_message->ContentEncoding() <<
">";
168 string t_payload_str;
169 bool t_payload_is_complete =
true;
175 t_payload_is_complete =
false;
176 t_payload_str += string( t_payload_chunk_length,
'#' );
180 t_payload_str += t_message->Body();
184 scarab::param_ptr_t t_payload;
185 string t_payload_error_msg;
186 if( t_payload_is_complete )
189 param_input_json t_input;
190 t_payload = t_input.read_string( t_payload_str );
193 t_payload_error_msg =
"Message body could not be parsed; skipping request";
198 t_payload_error_msg =
"Entire message was not available";
203 if( ! t_payload_error_msg.empty() )
206 t_payload = std::unique_ptr< scarab::param_node >(
new param_node() );
207 t_payload->as_node().add(
"invalid", t_payload_str );
208 t_payload->as_node().add(
"error", t_payload_error_msg );
211 LDEBUG(
dlog,
"Processing message:\n" <<
212 "Routing key: " << a_routing_key <<
'\n' <<
213 "Payload: " << t_payload_str );
217 using AmqpClient::Table;
218 using AmqpClient::TableEntry;
219 using AmqpClient::TableValue;
220 Table t_properties = t_first_valid_message->HeaderTable();
230 std::move(t_payload),
233 at( t_properties, std::string(
"specifier"), TableValue(
"") ).GetString(),
234 t_first_valid_message->ReplyTo(),
237 bool t_lockout_key_valid =
true;
238 t_request->lockout_key() =
uuid_from_string( at( t_properties, std::string(
"lockout_key"), TableValue(
"") ).GetString(), t_lockout_key_valid );
239 t_request->set_lockout_key_valid( t_lockout_key_valid );
241 t_message = t_request;
247 at( t_properties, std::string(
"return_code"), TableValue(999U) ).GetInteger(),
248 at( t_properties, std::string(
"return_message"), TableValue(
"") ).GetString(),
249 std::move(t_payload),
251 at( t_properties, std::string(
"specifier"), TableValue(
"") ).GetString(),
260 std::move(t_payload),
262 at( t_properties, std::string(
"specifier"), TableValue(
"") ).GetString(),
270 throw dripline_error() <<
"Message received with unhandled type: " << t_msg_type;
276 if( ! t_payload_error_msg.empty() )
278 t_message->set_is_valid(
false );
281 t_message->correlation_id() = t_first_valid_message->CorrelationId();
282 t_message->message_id() = t_first_valid_message->MessageId();
284 t_message->message_id() = t_message->message_id().substr( 0, t_message->message_id().find_first_of(
s_message_id_separator) );
285 t_message->timestamp() = at( t_properties, std::string(
"timestamp"), TableValue(
"") ).GetString();
287 Table t_sender_info = at( t_properties, std::string(
"sender_info"), TableValue(Table()) ).GetTable();
288 scarab::param_ptr_t t_sender_info_param =
table_to_param( t_sender_info );
289 t_message->set_sender_info( t_sender_info_param->as_node() );
291 t_message->payload() = *t_payload;
298 f_timestamp = scarab::get_formatted_now();
302 std::vector< string > t_body_parts;
305 unsigned t_n_chunks = t_body_parts.size();
306 std::vector< amqp_message_ptr > t_message_parts( t_n_chunks );
308 if( f_message_id.empty() )
315 unsigned i_chunk = 0;
316 for(
string& t_body_part : t_body_parts )
318 amqp_message_ptr t_message = AmqpClient::BasicMessage::Create( t_body_part );
321 t_message->CorrelationId( f_correlation_id );
322 t_message->MessageId( t_base_message_id +
std::to_string(i_chunk) + t_total_chunks_str );
323 t_message->ReplyTo( f_reply_to );
325 AmqpClient::Table t_properties;
328 t_properties.insert( AmqpClient::TableEntry(
"timestamp", f_timestamp ) );
333 t_message->HeaderTable( t_properties );
335 t_message_parts[i_chunk] = t_message;
340 return t_message_parts;
344 LERROR(
dlog, e.what() );
345 return std::vector< amqp_message_ptr >();
356 param_output_json t_output;
357 if( ! t_output.write_string( *
f_payload, t_body, a_options ) )
359 throw dripline_error() <<
"Could not convert message body to string";
362 unsigned t_chars_per_chunk = a_max_size /
sizeof(string::value_type);
363 unsigned t_n_chunks = std::ceil(
double(t_body.size()) /
double(t_chars_per_chunk) );
364 a_body_vec.resize( t_n_chunks );
365 for(
unsigned i_chunk = 0, pos = 0; pos < t_body.size(); pos += t_chars_per_chunk, ++i_chunk )
367 a_body_vec[i_chunk] = t_body.substr(pos, t_chars_per_chunk );
385 param_output_json t_output;
386 string t_message_string;
387 if( ! t_output.write_string( t_message_node, t_message_string, a_options ) )
391 if( t_message_string.size() > a_max_size ) t_message_string.resize( a_max_size );
392 return t_message_string;
406 return string(
"application/json" );
409 return string(
"Unknown" );
415 param_node t_sender_info;
416 t_sender_info.add(
"exe", f_sender_exe );
417 param_node t_versions;
418 for(
auto& i_version : f_sender_versions )
420 param_node t_version_info;
421 t_version_info.add(
"version", i_version.second.f_version );
422 if( ! i_version.second.f_commit.empty() ) t_version_info.add(
"commit", i_version.second.f_commit );
423 if( ! i_version.second.f_package.empty() ) t_version_info.add(
"package", i_version.second.f_package );
424 t_versions.add( i_version.first, std::move(t_version_info) );
426 t_sender_info.add(
"versions", std::move(t_versions) );
427 t_sender_info.add(
"hostname", f_sender_hostname );
428 t_sender_info.add(
"username", f_sender_username );
429 t_sender_info.add(
"service_name", f_sender_service_name );
430 return t_sender_info;
435 f_sender_exe = a_sender_info[
"exe"]().as_string();
436 const param_node& t_versions = a_sender_info[
"versions"].as_node();
437 f_sender_versions.clear();
438 for(
auto i_version = t_versions.begin(); i_version != t_versions.end(); ++i_version )
441 (*i_version)[
"version"]().as_string(),
442 i_version->get_value(
"commit",
""),
443 i_version->get_value(
"package",
"") ) ) );
445 f_sender_hostname = a_sender_info[
"hostname"]().as_string();
446 f_sender_username = a_sender_info[
"username"]().as_string();
447 f_sender_service_name = a_sender_info[
"service_name"]().as_string();
453 param_node t_message_node;
454 t_message_node.add(
"routing_key", f_routing_key );
455 t_message_node.add(
"specifier",
f_specifier.unparsed() );
456 t_message_node.add(
"correlation_id", f_correlation_id );
457 t_message_node.add(
"message_id", f_message_id );
458 t_message_node.add(
"reply_to", f_reply_to );
461 t_message_node.add(
"timestamp", f_timestamp );
463 if( a_include_payload ) t_message_node.add(
"payload",
payload().clone() );
465 return t_message_node;
476 f_lockout_key_valid( true ),
485 t_request->set_payload( std::move(a_payload) );
486 t_request->set_message_operation( a_msg_op );
487 t_request->routing_key() = a_routing_key;
488 t_request->parsed_specifier() = a_specifier;
489 t_request->reply_to() = a_reply_to;
490 t_request->set_encoding( a_encoding );
498 return msg_request::s_message_type;
516 return msg_reply::create( a_return_code.
rc_value(), a_ret_msg, std::move(a_payload), a_routing_key, a_specifier, a_encoding );
522 t_reply->set_return_code( a_return_code_value );
523 t_reply->return_message() = a_ret_msg;
524 t_reply->set_payload( std::move(a_payload) );
525 t_reply->routing_key() = a_routing_key;
526 t_reply->parsed_specifier() = a_specifier;
527 t_reply->set_encoding( a_encoding );
535 return msg_reply::s_message_type;
546 t_alert->set_payload( std::move(a_payload) );
547 t_alert->routing_key() = a_routing_key;
548 t_alert->parsed_specifier() = a_specifier;
549 t_alert->set_encoding( a_encoding );
563 return msg_alert::s_message_type;
570 if( a_lhs.sender_versions().size() != a_rhs.sender_versions().size() )
return false;
571 bool t_versions_are_equal =
true;
572 for(
auto i_version = std::make_pair(a_lhs.sender_versions().begin(), a_rhs.sender_versions().begin());
573 i_version.first != a_lhs.sender_versions().end();
578 t_versions_are_equal = t_versions_are_equal && i_version.first->second == i_version.second->second;
581 return a_lhs.routing_key() == a_rhs.routing_key() &&
582 a_lhs.correlation_id() == a_rhs.correlation_id() &&
583 a_lhs.reply_to() == a_rhs.reply_to() &&
584 a_lhs.get_encoding() == a_rhs.get_encoding() &&
585 a_lhs.timestamp() == a_rhs.timestamp() &&
586 t_versions_are_equal &&
587 a_lhs.sender_exe() == a_rhs.sender_exe() &&
588 a_lhs.sender_hostname() == a_rhs.sender_hostname() &&
589 a_lhs.sender_username() == a_rhs.sender_username() &&
590 a_lhs.sender_service_name() == a_rhs.sender_service_name() &&
598 a_lhs.lockout_key() == a_rhs.lockout_key() &&
599 a_lhs.get_lockout_key_valid() == a_rhs.get_lockout_key_valid() &&
600 a_lhs.get_message_operation() == a_rhs.get_message_operation();
606 a_lhs.get_return_code() == a_rhs.get_return_code() &&
607 a_lhs.return_message() == a_rhs.return_message();
618 return a_os << s_enc_strings[ a_enc ];
623 a_os <<
"Routing key: " << a_message.routing_key() <<
'\n';
624 a_os <<
"Correlation ID: " << a_message.correlation_id() <<
'\n';
625 a_os <<
"Reply To: " << a_message.reply_to() <<
'\n';
627 a_os <<
"Encoding: " << a_message.get_encoding() <<
'\n';
628 a_os <<
"Timestamp: " << a_message.timestamp() <<
'\n';
629 a_os <<
"Sender Info:\n";
630 a_os <<
"\tExecutable: " << a_message.sender_exe() <<
'\n';
631 a_os <<
"\tHostname: " << a_message.sender_hostname() <<
'\n';
632 a_os <<
"\tUsername: " << a_message.sender_username() <<
'\n';
633 a_os <<
"\tService: " << a_message.sender_service_name() <<
'\n';
634 a_os <<
"\tVersions:\n";
635 for(
const auto& i_version : a_message.sender_versions() )
637 a_os <<
"\t\t" << i_version.first <<
":\n";
638 a_os <<
"\t\t\tVersion: " << i_version.second.f_version <<
'\n';
639 a_os <<
"\t\t\tCommit: " << i_version.second.f_commit <<
'\n';
640 a_os <<
"\t\t\tPackage: " << i_version.second.f_package <<
'\n';
643 if( a_message.
payload().is_node() ) a_os <<
"Payload: " << a_message.
payload().as_node() <<
'\n';
644 else if( a_message.
payload().is_array() ) a_os <<
"Payload: " << a_message.
payload().as_array() <<
'\n';
645 else if( a_message.
payload().is_value() ) a_os <<
"Payload: " << a_message.
payload().as_value() <<
'\n';
646 else a_os <<
"Payload: null\n";
652 a_os << static_cast< const message& >( a_message );
653 a_os <<
"Lockout Key: " << a_message.lockout_key() <<
'\n';
654 a_os <<
"Lockout Key Valid: " << a_message.get_lockout_key_valid() <<
'\n';
655 a_os <<
"Message Operation: " << a_message.get_message_operation() <<
'\n';
661 a_os << static_cast< const message& >( a_message );
662 a_os <<
"Return Code: " << a_message.get_return_code() <<
'\n';
663 a_os <<
"Return Message: " << a_message.return_message() <<
'\n';
669 a_os << static_cast< const message& >( a_message );
Dripline-specific errors.
Contains all of the information common to all types of Dripline messages.
static message_ptr_t process_message(amqp_split_message_ptrs a_message_ptrs, const std::string &a_routing_key)
Converts a set of AMQP messages to a Dripline message object.
virtual void derived_modify_amqp_message(amqp_message_ptr a_amqp_msg, AmqpClient::Table &a_properties) const =0
std::string interpret_encoding() const
scarab::param_ptr_t f_payload
scarab::param_node get_sender_info() const
Creates and returns a new param_node object to contain the sender info.
specifier & parsed_specifier()
std::string encode_full_message(unsigned a_max_size, const scarab::param_node &a_options=scarab::param_node()) const
Converts the entire message into a single string (default encoding is JSON)
virtual void derived_modify_message_param(scarab::param_node &a_message_node) const =0
amqp_split_message_ptrs create_amqp_messages(unsigned a_max_size=10000)
Converts a Dripline message object to a set of AMQP messages.
scarab::param & payload()
void set_sender_info(const scarab::param_node &a_sender_info)
Copies the sender info out of a param_node.
static std::tuple< std::string, unsigned, unsigned > parse_message_id(const std::string &a_message_id)
Parses the message ID, which should be of the form [UUID]/[chunk]/[total chunks].
virtual msg_t message_type() const =0
static const char s_message_id_separator
scarab::param_node get_message_param(bool a_include_payload=true) const
Creates and returns a new param_node object to contain the full message.
void encode_message_body(std::vector< std::string > &a_body_vec, unsigned a_max_size, const scarab::param_node &a_options=scarab::param_node()) const
Converts the message-body to a strings (default encoding is JSON) for creating AMQP messages.
virtual msg_t message_type() const
static alert_ptr_t create(scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Creates an alert message.
virtual msg_t message_type() const
static reply_ptr_t create(const return_code &a_return_code, const std::string &a_ret_msg, scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Create a reply message using a return_code object and manually specifying the destination.
virtual msg_t message_type() const
static request_ptr_t create(scarab::param_ptr_t a_payload, op_t a_msg_op, const std::string &a_routing_key, const std::string &a_specifier="", const std::string &a_reply_to="", message::encoding a_encoding=encoding::json)
Create a request message.
std::string to_string() const
Converts specifier tokens into a single string.
AmqpClient::TableValue param_to_table(const scarab::param_node &a_node)
std::vector< amqp_message_ptr > amqp_split_message_ptrs
std::string string_from_uuid(const uuid_t &a_id)
Generates a string representation of the provided UUID.
std::shared_ptr< msg_alert > alert_ptr_t
std::ostream & operator<<(std::ostream &a_os, op_t an_op)
Pass the integer-equivalent of a message-operation enum to an ostream.
std::string to_string(op_t an_op)
Gives the human-readable version of a message operation.
std::shared_ptr< message > message_ptr_t
uuid_t uuid_from_string(const std::string &a_id_str)
std::shared_ptr< msg_reply > reply_ptr_t
unsigned to_uint(op_t an_op)
Convert a message-operation enum to an integer.
scarab::param_ptr_t table_to_param(const AmqpClient::Table &a_table)
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
op_t to_op_t(unsigned an_op_uint)
AmqpClient::BasicMessage::ptr_t amqp_message_ptr
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
std::shared_ptr< msg_request > request_ptr_t
uuid_t generate_nil_uuid()
Generates a UUID containing all 0s.
msg_t to_msg_t(unsigned a_msg_uint)
bool operator==(const message &a_lhs, const message &a_rhs)
bool operator==(const sender_package_version &a_rhs) const
Base class for return codes.
virtual unsigned rc_value() const =0