8 #ifndef DRIPLINE_SCHEDULER_HH_
9 #define DRIPLINE_SCHEDULER_HH_
11 #include "cancelable.hh"
16 #include "member_variables.hh"
19 #include <condition_variable>
64 virtual void operator()( std::function<
void() > an_executable )
102 template<
typename executor = simple_executor,
typename clock = std::chrono::system_clock >
148 void unschedule(
int an_id );
160 mv_referrable_const( executor, the_executor );
166 mv_accessible_static(
int, curr_id )
175 std::condition_variable_any
f_cv;
179 template<
typename executor,
typename clock >
182 template<
typename executor,
typename clock >
185 f_exe_buffer( std::chrono::milliseconds(50) ),
186 f_cycle_time( std::chrono::milliseconds(500) ),
195 template<
typename executor,
typename clock >
208 std::unique_lock< std::recursive_mutex >t_orig_lock( a_orig.f_scheduler_mutex );
210 cancelable::operator=( std::move(a_orig) );
211 f_exe_buffer = std::move(a_orig.f_exe_buffer);
212 f_cycle_time = std::move(a_orig.f_cycle_time);
213 f_events = std::move(a_orig.f_events);
217 template<
typename executor,
typename clock >
220 std::unique_lock< std::recursive_mutex >t_this_lock( f_scheduler_mutex );
221 std::unique_lock< std::recursive_mutex >t_orig_lock( a_orig.f_scheduler_mutex );
223 cancelable::operator=( std::move(a_orig) );
224 f_exe_buffer = std::move(a_orig.f_exe_buffer);
225 f_cycle_time = std::move(a_orig.f_cycle_time);
226 f_events = std::move(a_orig.f_events);
227 f_scheduler_thread = std::move(a_orig.f_scheduler_thread);
232 template<
typename executor,
typename clock >
235 bool t_new_first =
false;
236 std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
237 if( f_events.empty() || an_exe_time < f_events.begin()->first )
239 LDEBUG(
dlog_sh,
"New first event" );
242 LDEBUG(
dlog_sh,
"Inserting new event" );
244 t_event.f_executable = an_executable;
245 t_event.f_id = s_curr_id++;
246 f_events.insert( std::make_pair( an_exe_time, t_event ) );
250 LDEBUG(
dlog_sh,
"That event was first; waking execution thread" );
257 template<
typename executor,
typename clock >
261 if( an_interval < 2*f_exe_buffer )
263 throw dripline_error() <<
"Cannot schedule executions with an interval of less than " << std::chrono::duration_cast<std::chrono::seconds>(2*f_exe_buffer).count() <<
" seconds";
266 std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
268 schedule_repeating( an_executable, an_interval, t_id, an_exe_time );
274 template<
typename executor,
typename clock >
277 LDEBUG(
dlog_sh,
"Scheduling a repeating event" );
280 executable_t t_wrapped_executable = [
this, an_executable, an_interval, an_id, a_rep_start](){
281 if( this->is_canceled() )
return;
282 LDEBUG(
dlog_sh,
"wrapped execution" );
284 this->schedule_repeating( an_executable, an_interval, an_id, a_rep_start + an_interval );
286 LDEBUG(
dlog_sh,
"executing the wrapped executable" );
292 t_event.f_executable = t_wrapped_executable;
293 t_event.f_id = an_id;
296 bool t_new_first =
false;
297 std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
298 if( f_events.empty() || a_rep_start < f_events.begin()->first )
300 LDEBUG(
dlog_sh,
"New first event" );
305 f_events.insert( std::make_pair( a_rep_start, t_event ) );
309 LDEBUG(
dlog_sh,
"That event was first; waking execution thread" );
316 template<
typename executor,
typename clock >
319 std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
320 auto i_event = f_events.begin();
321 for( ; i_event->second.f_id != an_id && i_event != f_events.end(); ++i_event ) {
322 LDEBUG(
dlog_sh,
"Looking for event with id <" << an_id <<
">; found one with id <" << i_event->second.f_id <<
">" );
325 if( i_event->second.f_id == an_id )
327 LDEBUG(
dlog_sh,
"Found event with id <" << i_event->second.f_id <<
">; erasing it now" );
328 f_events.erase( i_event );
329 LDEBUG(
dlog_sh,
"Removed event <" << an_id <<
"> from the schedule" );
333 LDEBUG(
dlog_sh,
"No event with id <" << an_id <<
"> found" );
339 template<
typename executor,
typename clock >
342 LDEBUG(
dlog_sh,
"Starting scheduler" );
343 while( ! is_canceled() )
345 std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
346 if( f_events.empty() )
349 f_cv.wait_for( t_lock, f_cycle_time );
354 auto t_first_event = f_events.begin();
356 duration_t t_to_earliest = t_first_event->first - clock::now();
357 if( t_to_earliest < f_exe_buffer )
360 LDEBUG(
dlog_sh,
"Executing first event from the map" );
361 std::unique_lock< std::mutex > t_exe_lock( f_executor_mutex );
362 f_the_executor( t_first_event->second.f_executable );
363 f_events.erase( t_first_event );
366 if( t_to_earliest < f_cycle_time )
369 f_cv.wait_until( t_lock, t_first_event->first );
373 f_cv.wait_for( t_lock, f_cycle_time );
377 LDEBUG(
dlog_sh,
"Scheduler exiting" );
Dripline-specific errors.
Executes scheduled events.
scheduler & operator=(scheduler &&a_orig)
scheduler(const scheduler &)=delete
typename clock::duration duration_t
virtual ~scheduler()=default
void execute()
Main execution loop for the scheduler.
scheduler & operator=(const scheduler &)=delete
std::thread f_scheduler_thread
snake_case_mv_accessible_static(int, curr_id) protected std::recursive_mutex f_scheduler_mutex
The ID to be used for the next scheduled event.
typename clock::time_point time_point_t
std::multimap< time_point_t, event > events_map_t
void unschedule(int an_id)
Unschedule an event using the event's ID.
std::condition_variable_any f_cv
int schedule(executable_t an_executable, time_point_t an_exe_time)
std::mutex f_executor_mutex
std::function< void() > executable_t
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog_sh("scheduler", __FILE_NAME__, __LINE__)
Base class for executors.
base_executor & operator=(const base_executor &)=default
base_executor(const base_executor &)=default
virtual void operator()(std::function< void() >)=0
base_executor(base_executor &&)=default
virtual ~base_executor()=default
base_executor & operator=(base_executor &&)=default
Definition of an event, including the executable object and the scheduler ID.
executable_t f_executable
Given an executable function object, uses operator() to execute it.
simple_executor & operator=(simple_executor &&)=default
simple_executor & operator=(const simple_executor &)=default
virtual void operator()(std::function< void() > an_executable)
simple_executor(simple_executor &&)=default
simple_executor(const simple_executor &)=default
virtual ~simple_executor()=default