Dripline-Cpp  v2.10.11
Dripline Implementation in C++
scheduler.hh
Go to the documentation of this file.
1 /*
2  * scheduler.hh
3  *
4  * Created on: Aug 13, 2019
5  * Author: N.S. Oblath
6  */
7 
8 #ifndef DRIPLINE_SCHEDULER_HH_
9 #define DRIPLINE_SCHEDULER_HH_
10 
11 #include "cancelable.hh"
12 
13 #include "dripline_exceptions.hh"
14 
15 #include "logger.hh"
16 #include "member_variables.hh"
17 
18 #include <chrono>
19 #include <condition_variable>
20 #include <functional>
21 #include <thread>
22 #include <map>
23 #include <mutex>
24 #include <utility>
25 
26 LOGGER( dlog_sh, "scheduler" )
27 
28 namespace dripline
29 {
40  {
42  base_executor( const base_executor& ) = default;
43  base_executor( base_executor&& ) = default;
44  virtual ~base_executor() = default;
45  base_executor& operator=( const base_executor& ) = default;
47  virtual void operator()( std::function< void() > ) = 0;
48  };
49 
57  {
59  simple_executor( const simple_executor& ) = default;
61  virtual ~simple_executor() = default;
64  virtual void operator()( std::function< void() > an_executable )
65  {
66  LDEBUG( dlog_sh, "executing" );
67  an_executable();
68  return;
69  }
70  };
71 
102  template< typename executor = simple_executor, typename clock = std::chrono::system_clock >
103  class DRIPLINE_API scheduler : virtual public scarab::cancelable
104  {
105  public:
106  using clock_t = clock;
107  using time_point_t = typename clock::time_point;
108  using duration_t = typename clock::duration;
109  using executable_t = std::function< void() >;
110 
115  struct event
116  {
118  int f_id;
119  };
120  typedef std::multimap< time_point_t, event > events_map_t;
121 
122  scheduler();
123  scheduler( const scheduler& ) = delete;
125  virtual ~scheduler() = default;
126 
127  scheduler& operator=( const scheduler& ) = delete;
129 
136  int schedule( executable_t an_executable, time_point_t an_exe_time );
137 
145  int schedule( executable_t an_executable, duration_t an_interval, time_point_t an_exe_time = clock::now() );
146 
148  void unschedule( int an_id );
149 
151  void execute();
152 
154  mv_accessible( duration_t, exe_buffer );
155 
157  mv_accessible( duration_t, cycle_time );
158 
160  mv_referrable_const( executor, the_executor );
161 
163  mv_referrable_const( events_map_t, events );
164 
166  mv_accessible_static( int, curr_id )
167 
168  protected:
169  void schedule_repeating( executable_t an_executable, duration_t an_interval, int an_id, time_point_t a_rep_start = clock::now() );
170 
171  std::recursive_mutex f_scheduler_mutex; // recursive_mutex is used so that the mutex can be locked twice by the same thread when using a repeating schedule
172 
173  std::mutex f_executor_mutex;
174 
175  std::condition_variable_any f_cv;
176  std::thread f_scheduler_thread;
177  };
178 
179  template< typename executor, typename clock >
181 
182  template< typename executor, typename clock >
184  cancelable(),
185  f_exe_buffer( std::chrono::milliseconds(50) ),
186  f_cycle_time( std::chrono::milliseconds(500) ),
187  f_the_executor(),
188  f_events(),
189  f_scheduler_mutex(),
190  f_executor_mutex(),
191  f_cv(),
192  f_scheduler_thread()
193  {}
194 
195  template< typename executor, typename clock >
197  cancelable(),
198  f_exe_buffer(),
199  f_cycle_time(),
200  f_the_executor(),
201  f_events(),
202  f_scheduler_mutex(),
203  f_executor_mutex(),
204  f_cv(),
205  f_scheduler_thread()
206  {
207  std::unique_lock< std::recursive_mutex >t_this_lock( f_scheduler_mutex );
208  std::unique_lock< std::recursive_mutex >t_orig_lock( a_orig.f_scheduler_mutex );
209 
210  cancelable::operator=( std::move(a_orig) );
211  f_exe_buffer = std::move(a_orig.f_exe_buffer);
212  f_cycle_time = std::move(a_orig.f_cycle_time);
213  f_events = std::move(a_orig.f_events);
214  f_scheduler_thread = std::move(a_orig.f_scheduler_thread);
215  }
216 
217  template< typename executor, typename clock >
218  scheduler< executor, clock>& scheduler< executor, clock >::operator=( scheduler< executor, clock >&& a_orig )
219  {
220  std::unique_lock< std::recursive_mutex >t_this_lock( f_scheduler_mutex );
221  std::unique_lock< std::recursive_mutex >t_orig_lock( a_orig.f_scheduler_mutex );
222 
223  cancelable::operator=( std::move(a_orig) );
224  f_exe_buffer = std::move(a_orig.f_exe_buffer);
225  f_cycle_time = std::move(a_orig.f_cycle_time);
226  f_events = std::move(a_orig.f_events);
227  f_scheduler_thread = std::move(a_orig.f_scheduler_thread);
228 
229  return *this;
230  }
231 
232  template< typename executor, typename clock >
234  {
235  bool t_new_first = false;
236  std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
237  if( f_events.empty() || an_exe_time < f_events.begin()->first )
238  {
239  LDEBUG( dlog_sh, "New first event" );
240  t_new_first = true;
241  }
242  LDEBUG( dlog_sh, "Inserting new event" );
243  event t_event;
244  t_event.f_executable = an_executable;
245  t_event.f_id = s_curr_id++;
246  f_events.insert( std::make_pair( an_exe_time, t_event ) );
247  if( t_new_first )
248  {
249  // wake the waiting thread
250  LDEBUG( dlog_sh, "That event was first; waking execution thread" );
251  f_cv.notify_one();
252  }
253 
254  return t_event.f_id;
255  }
256 
257  template< typename executor, typename clock >
258  int scheduler< executor, clock >::schedule( executable_t an_executable, duration_t an_interval, time_point_t an_exe_time )
259  {
260  // if the interval is too short, it's more likely that the execution time will be longer than the interval
261  if( an_interval < 2*f_exe_buffer )
262  {
263  throw dripline_error() << "Cannot schedule executions with an interval of less than " << std::chrono::duration_cast<std::chrono::seconds>(2*f_exe_buffer).count() << " seconds";
264  }
265 
266  std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
268  schedule_repeating( an_executable, an_interval, t_id, an_exe_time );
269 
270  // return the id
271  return t_id;
272  }
273 
274  template< typename executor, typename clock >
275  void scheduler< executor, clock >::schedule_repeating( executable_t an_executable, duration_t an_interval, int an_id, time_point_t a_rep_start )
276  {
277  LDEBUG( dlog_sh, "Scheduling a repeating event" );
278 
279  // create the wrapper executable around the event
280  executable_t t_wrapped_executable = [this, an_executable, an_interval, an_id, a_rep_start](){
281  if( this->is_canceled() ) return;
282  LDEBUG( dlog_sh, "wrapped execution" );
283  // reschedule itself an_interval in the future
284  this->schedule_repeating( an_executable, an_interval, an_id, a_rep_start + an_interval );
285  // execute the event
286  LDEBUG( dlog_sh, "executing the wrapped executable" );
287  an_executable();
288  };
289 
290  // create the event
291  event t_event;
292  t_event.f_executable = t_wrapped_executable;
293  t_event.f_id = an_id;
294 
295  // check if this'll be a new first event
296  bool t_new_first = false;
297  std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
298  if( f_events.empty() || a_rep_start < f_events.begin()->first )
299  {
300  LDEBUG( dlog_sh, "New first event" );
301  t_new_first = true;
302  }
303 
304  // add the event to the map
305  f_events.insert( std::make_pair( a_rep_start, t_event ) );
306  if( t_new_first )
307  {
308  // wake the waiting thread
309  LDEBUG( dlog_sh, "That event was first; waking execution thread" );
310  f_cv.notify_one();
311  }
312 
313  return;
314  }
315 
316  template< typename executor, typename clock >
318  {
319  std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
320  auto i_event = f_events.begin();
321  for( ; i_event->second.f_id != an_id && i_event != f_events.end(); ++i_event ) {
322  LDEBUG( dlog_sh, "Looking for event with id <" << an_id << ">; found one with id <" << i_event->second.f_id << ">" );
323  }
324 
325  if( i_event->second.f_id == an_id )
326  {
327  LDEBUG( dlog_sh, "Found event with id <" << i_event->second.f_id << ">; erasing it now" );
328  f_events.erase( i_event );
329  LDEBUG( dlog_sh, "Removed event <" << an_id << "> from the schedule" );
330  }
331  else
332  {
333  LDEBUG( dlog_sh, "No event with id <" << an_id << "> found" );
334  }
335 
336  return;
337  }
338 
339  template< typename executor, typename clock >
341  {
342  LDEBUG( dlog_sh, "Starting scheduler" );
343  while( ! is_canceled() )
344  {
345  std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
346  if( f_events.empty() )
347  {
348  // wait for f_cycle_time
349  f_cv.wait_for( t_lock, f_cycle_time );
350  continue;
351  }
352  else
353  {
354  auto t_first_event = f_events.begin();
355  //time_point_t t_earliest = t_first_exe.first;
356  duration_t t_to_earliest = t_first_event->first - clock::now();
357  if( t_to_earliest < f_exe_buffer )
358  {
359  // do event now
360  LDEBUG( dlog_sh, "Executing first event from the map" );
361  std::unique_lock< std::mutex > t_exe_lock( f_executor_mutex );
362  f_the_executor( t_first_event->second.f_executable );
363  f_events.erase( t_first_event );
364  continue;
365  }
366  if( t_to_earliest < f_cycle_time )
367  {
368  // wait until t_first_event->first
369  f_cv.wait_until( t_lock, t_first_event->first );
370  continue;
371  }
372  // wait for f_cycle_time
373  f_cv.wait_for( t_lock, f_cycle_time );
374  continue;
375  }
376  }
377  LDEBUG( dlog_sh, "Scheduler exiting" );
378  return;
379  }
380 
381 } /* namespace dripline */
382 
383 #endif /* DRIPLINE_SCHEDULER_HH_ */
Dripline-specific errors.
Executes scheduled events.
Definition: scheduler.hh:104
scheduler & operator=(scheduler &&a_orig)
scheduler(const scheduler &)=delete
scheduler(scheduler &&)
typename clock::duration duration_t
Definition: scheduler.hh:108
virtual ~scheduler()=default
void execute()
Main execution loop for the scheduler.
Definition: scheduler.hh:340
scheduler & operator=(const scheduler &)=delete
std::thread f_scheduler_thread
Definition: scheduler.hh:176
snake_case_mv_accessible_static(int, curr_id) protected std::recursive_mutex f_scheduler_mutex
The ID to be used for the next scheduled event.
Definition: scheduler.hh:166
typename clock::time_point time_point_t
Definition: scheduler.hh:107
std::multimap< time_point_t, event > events_map_t
Definition: scheduler.hh:120
void unschedule(int an_id)
Unschedule an event using the event's ID.
Definition: scheduler.hh:317
std::condition_variable_any f_cv
Definition: scheduler.hh:175
int schedule(executable_t an_executable, time_point_t an_exe_time)
Definition: scheduler.hh:233
std::mutex f_executor_mutex
Definition: scheduler.hh:173
std::function< void() > executable_t
Definition: scheduler.hh:109
#define DRIPLINE_API
Definition: dripline_api.hh:34
static ::scarab::logger_type< ::scarab::spd_initializer_async_stdout_color_mt > dlog_sh("scheduler", __FILE_NAME__, __LINE__)
Base class for executors.
Definition: scheduler.hh:40
base_executor & operator=(const base_executor &)=default
base_executor(const base_executor &)=default
virtual void operator()(std::function< void() >)=0
base_executor(base_executor &&)=default
virtual ~base_executor()=default
base_executor & operator=(base_executor &&)=default
Definition of an event, including the executable object and the scheduler ID.
Definition: scheduler.hh:116
Given an executable function object, uses operator() to execute it.
Definition: scheduler.hh:57
simple_executor & operator=(simple_executor &&)=default
simple_executor & operator=(const simple_executor &)=default
virtual void operator()(std::function< void() > an_executable)
Definition: scheduler.hh:64
simple_executor(simple_executor &&)=default
simple_executor(const simple_executor &)=default
virtual ~simple_executor()=default