8 #define DRIPLINE_API_EXPORTS
16 #include "signal_handler.hh"
18 LOGGER(
dlog,
"receiver" );
33 f_messages( std::move(a_orig.f_messages) ),
34 f_chunks_received( a_orig.f_chunks_received ),
35 f_routing_key( std::move(a_orig.f_routing_key) ),
36 f_thread( std::move(a_orig.f_thread) ),
39 f_processing( a_orig.f_processing.load() )
41 a_orig.f_chunks_received = 0;
42 a_orig.f_processing.store(
false );
48 f_incoming_messages(),
49 f_single_message_wait_ms( 1000 ),
50 f_reply_listen_timeout_ms( 1000 )
55 cancelable::operator=( std::move(a_orig) );
56 f_incoming_messages = std::move(a_orig.f_incoming_messages);
57 f_single_message_wait_ms = a_orig.f_single_message_wait_ms;
58 f_reply_listen_timeout_ms = a_orig.f_reply_listen_timeout_ms;
67 LDEBUG(
dlog,
"Received a message chunk <" << t_message->MessageId() );
70 std::string t_message_id( std::get<0>(t_parsed_message_id) );
71 if( incoming_messages().count( t_message_id ) == 0 )
74 LDEBUG(
dlog,
"This is the first chunk for this message; creating new message pack" );
78 t_pack.
f_messages.resize( std::get<2>(t_parsed_message_id) );
80 t_pack.
f_messages[std::get<1>(t_parsed_message_id)] = t_message;
87 LDEBUG(
dlog,
"Single-chunk message being sent directly to processing" );
93 t_pack.
f_thread = std::thread([
this, &t_pack, &t_parsed_message_id](){
wait_for_message(t_pack, std::get<0>(t_parsed_message_id)); });
100 LDEBUG(
dlog,
"This is not the first chunk for this message; adding to message pack" );
104 LWARN(
dlog,
"Message <" << std::get<0>(t_parsed_message_id) <<
"> is already being processed\n" <<
105 "Just received chunk " << std::get<1>(t_parsed_message_id) <<
" of " << std::get<2>(t_parsed_message_id) );
110 std::unique_lock< std::mutex > t_lock( t_pack.
f_mutex );
111 if( t_pack.
f_messages[std::get<1>(t_parsed_message_id)] )
113 LWARN(
dlog,
"Received duplicate message chunk for message <" << std::get<0>(t_parsed_message_id) <<
">; chunk " << std::get<1>(t_parsed_message_id) );
118 t_pack.
f_messages[std::get<1>(t_parsed_message_id)] = t_message;
122 t_pack.
f_conv.notify_one();
129 LERROR(
dlog,
"Dripline exception caught while handling message chunk: " << e.what() );
131 catch( std::exception& e )
133 LERROR(
dlog,
"Standard exception caught while handling message chunk: " << e.what() );
141 std::unique_lock< std::mutex > t_lock( a_pack.
f_mutex );
153 auto t_now = std::chrono::system_clock::now();
154 while( a_pack.
f_conv.wait_until( t_lock, t_now + std::chrono::milliseconds(f_single_message_wait_ms) ) == std::cv_status::no_timeout )
167 LWARN(
dlog,
"Timed out; message may be incomplete" );
181 incoming_messages().erase( a_message_id );
191 LERROR(
dlog,
"Dripline exception caught while processing message pack: " << e.what() );
193 catch( std::exception& e )
195 LERROR(
dlog,
"Standard exception caught while processing message pack: " << e.what() );
203 throw dripline_error() <<
"Process_message function has not been implemented";
214 if ( ! a_receive_reply->f_channel )
219 if( a_timeout_ms != 0 )
221 LDEBUG(
dlog,
"Waiting for a reply (timeout: " << a_timeout_ms <<
" ms)" );
225 LDEBUG(
dlog,
"Waiting for a reply (no timeout)" );
229 unsigned t_chunk_timeout_ms = f_reply_listen_timeout_ms;
230 if( a_timeout_ms > 0 && a_timeout_ms < t_chunk_timeout_ms )
232 t_chunk_timeout_ms = a_timeout_ms;
236 auto t_timeout_time = std::chrono::system_clock::now() + std::chrono::milliseconds(a_timeout_ms);
243 while( ! is_canceled() && (a_timeout_ms == 0 || std::chrono::system_clock::now() < t_timeout_time) )
246 core::listen_for_message( t_envelope, a_status, a_receive_reply->f_channel, a_receive_reply->f_consumer_tag, t_chunk_timeout_ms,
false );
251 LDEBUG(
dlog,
"Receiver was canceled before receiving reply" );
258 LWARN(
dlog,
"There was a soft error while listening for a reply; no message received" );
265 LERROR(
dlog,
"There was a hard error error while listening for a reply; no message received" );
272 LTRACE(
dlog,
"Listening for reply message chunks timed out" );
280 LERROR(
dlog,
"An unknown status occurred while listening for messages" );
291 LDEBUG(
dlog,
"Received a message chunk <" << t_message->MessageId() );
294 if( f_incoming_messages.count( std::get<0>(t_parsed_message_id) ) == 0 )
297 LDEBUG(
dlog,
"This is the first chunk for this message; creating new message pack" );
301 t_pack.
f_messages.resize( std::get<2>(t_parsed_message_id) );
303 t_pack.
f_messages[std::get<1>(t_parsed_message_id)] = t_message;
316 LDEBUG(
dlog,
"This is not the first chunk for this message; adding to message pack" );
320 LWARN(
dlog,
"Message <" << std::get<0>(t_parsed_message_id) <<
"> is already being processed\n" <<
321 "Just received chunk " << std::get<1>(t_parsed_message_id) <<
" of " << std::get<2>(t_parsed_message_id) );
325 if( t_pack.
f_messages[std::get<1>(t_parsed_message_id)] )
327 LWARN(
dlog,
"Received duplicate message chunk for message <" << std::get<0>(t_parsed_message_id) <<
">; chunk " << std::get<1>(t_parsed_message_id) );
332 t_pack.
f_messages[std::get<1>(t_parsed_message_id)] = t_message;
345 LERROR(
dlog,
"There was a problem processing the message: " << e.what() );
353 if( ! is_canceled() && std::chrono::system_clock::now() > t_timeout_time )
355 LINFO(
dlog,
"Listening for reply message timed out" );
360 LDEBUG(
dlog,
"Receiver was canceled" );
371 f_incoming_messages.erase( a_message_id );
373 if( t_message->is_reply() )
375 return std::static_pointer_cast< msg_reply >( t_message );
384 LERROR(
dlog,
"Dripline exception caught while handling message: " << e.what() );
388 LERROR(
dlog,
"AMQP exception caught while sending reply: (" << e.reply_code() <<
") " << e.reply_text() );
392 LERROR(
dlog,
"AMQP Library Exception caught while sending reply: (" << e.ErrorCode() <<
") " << e.what() );
394 catch( std::exception& e )
396 LERROR(
dlog,
"Standard exception caught while sending reply: " << e.what() );
425 f_message_queue.push( a_message );
433 while( ! is_canceled() )
436 if( f_message_queue.timed_wait_and_pop( t_message ) )
442 catch(
const std::exception& e )
445 LERROR(
dlog,
"Exception caught; shutting down.\n" <<
"\t" << e.what() );
446 scarab::signal_handler::cancel_all( RETURN_ERROR );
Receives and processes messages concurrently.
virtual ~concurrent_receiver()
concurrent_receiver & operator=(const concurrent_receiver &)=delete
void execute()
Handles messages that appear in the concurrent queue by calling submit_message().
virtual void submit_message(message_ptr_t a_message)=0
virtual void process_message(message_ptr_t a_message)
Deposits the message in the concurrent queue (called by the listener)
static void listen_for_message(amqp_envelope_ptr &a_envelope, post_listen_status &a_status, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
listen for a single AMQP message
@ timeout
A timeout occurred, and the channel is still valid.
@ unknown
Initialized or unknown status.
@ soft_error
An error occurred, but the channel should still be valid.
@ hard_error
An error occurred, and the channel is no longer valid.
Dripline-specific errors.
static message_ptr_t process_message(amqp_split_message_ptrs a_message_ptrs, const std::string &a_routing_key)
Converts a set of AMQP messages to a Dripline message object.
static std::tuple< std::string, unsigned, unsigned > parse_message_id(const std::string &a_message_id)
Parses the message ID, which should be of the form [UUID]/[chunk]/[total chunks].
A receiver is able to collect Dripline message chunks and reassemble them into a complete Dripline me...
void process_message_pack(incoming_message_pack &a_pack, const std::string &a_message_id)
Converts a message pack into a Dripline message, and then submits the message for processing.
receiver & operator=(const receiver &a_orig)=delete
void handle_message_chunk(amqp_envelope_ptr a_envelope)
virtual void process_message(message_ptr_t a_message)
reply_ptr_t wait_for_reply(const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms=0)
reply_ptr_t process_received_reply(incoming_message_pack &a_pack, const std::string &a_message_id)
void wait_for_message(incoming_message_pack &a_pack, const std::string &a_message_id)
AmqpClient::AmqpLibraryException amqp_lib_exception
std::shared_ptr< message > message_ptr_t
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
std::shared_ptr< msg_reply > reply_ptr_t
AmqpClient::BasicMessage::ptr_t amqp_message_ptr
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("agent", __FILE_NAME__, __LINE__)
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
AmqpClient::AmqpException amqp_exception
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog("receiver", __FILE_NAME__, __LINE__)
Stores the basic information about a set of message chunks that will eventually make a Dripline messa...
std::condition_variable f_conv
unsigned f_chunks_received
std::string f_routing_key
std::atomic< bool > f_processing
amqp_split_message_ptrs f_messages