HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
NET_WebScheduler.h
Go to the documentation of this file.
1 /*
2  * PROPRIETARY INFORMATION. This software is proprietary to
3  * Side Effects Software Inc., and is not to be reproduced,
4  * transmitted, or disclosed in any way without written permission.
5  *
6  * NAME: NET_Scheduler.h
7  *
8  * COMMENTS:
9  * A simple scheduler for the web server to fire callbacks.
10  */
11 
12 #ifndef __NET_WEBSCHEDULER_H__
13 #define __NET_WEBSCHEDULER_H__
14 
15 #include "NET_API.h"
16 
17 #include <UT/UT_Array.h>
18 #include <UT/UT_Condition.h>
19 #include <UT/UT_Debug.h>
20 #include <UT/UT_Functor.h>
21 #include <UT/UT_Lock.h>
22 #include <UT/UT_SysClone.h>
23 #include <UT/UT_Thread.h>
24 
25 #include <algorithm>
26 #include <chrono>
27 #include <functional>
28 
29 template <class ClockT, class DurationT>
31 {
32 public:
33  class Event;
34 
36  using TimePoint = std::chrono::time_point<ClockT, DurationT>;
37  using Comparator = std::greater<Event>;
38 
39  using EventId = int;
40 
41  NET_Scheduler();
43 
44  // Single callback to fire at a later time.
45  class Event
46  {
47  public:
48  Event() : myId(0), myTime(), myCallback(), myRunOnMainThread(true) {}
49 
50  Event(EventId id, const TimePoint &et, Callback clb, bool run_on_mt)
51  : myId(id)
52  , myTime(et)
53  , myCallback(clb)
54  , myRunOnMainThread(run_on_mt)
55  {
56  }
57 
58  bool operator<(const Event &rhs) const
59  {
60  if (myTime != rhs.myTime)
61  return myTime < rhs.myTime;
62  return myId < rhs.myId;
63  }
64  bool operator>(const Event &rhs) const
65  {
66  if (myTime != rhs.myTime)
67  return myTime > rhs.myTime;
68  return myId > rhs.myId;
69  }
70 
71  // Check if this scheduled callback is read to fire
72  bool isReadyToFire() const
73  {
74  return std::chrono::time_point_cast<std::chrono::milliseconds>(
75  ClockT::now()) >= myTime;
76  }
77  DurationT fireIn() const
78  {
79  return myTime -
80  std::chrono::time_point_cast<std::chrono::milliseconds>(
81  ClockT::now());
82  }
83 
85  // The time in which to fire this event
89  };
90 
91  // Start the scheduler
92  void start();
93  // Stop the scheduler
94  void stop();
95  // Pause the scheduler
96  void pause();
97  // Resume the scheduler
98  void resume();
99 
100  // Run when the second the scheduler is free
101  EventId run(Callback func, bool run_on_main_thread = true)
102  {
103  return insert(DurationT{0}, func, run_on_main_thread);
104  }
105  // Run in x seconds
106  template <class T>
107  EventId runIn(const T &t, Callback func, bool run_on_main_thread = true)
108  {
109  auto mill_t = std::chrono::duration_cast<DurationT>(t);
110  return insert(mill_t, func, run_on_main_thread);
111  }
112  // runs every x seconds
113  template <class T>
114  EventId runEvery(const T &t, Callback func, bool run_on_main_thread = true)
115  {
116  auto mill_t = std::chrono::duration_cast<DurationT>(t);
117  return insert(
118  mill_t,
119  [=]() {
120  func();
121  runEvery(mill_t, func, run_on_main_thread);
122  },
123  run_on_main_thread);
124  }
125  // Run at a given time.
127  const TimePoint &tp,
128  Callback func,
129  bool run_on_main_thread = true)
130  {
131  return insert(tp, func, run_on_main_thread);
132  }
133 
134  bool remove(EventId id);
135 
136  /// @brief Get the amount of time until next event is ready.
137  ///
138  /// @return elapsed time until next event is scheduled to be fired.
139  DurationT elapsedTimeToNextEvent() const
140  {
141  UT_AutoLock lock(myLock);
142 
143  if (myQueue.entries() <= 0)
144  return DurationT{};
145 
146  // check the first element and see if we can pop the heap
147  return myQueue.begin()->fireIn();
148  }
149 
150  // Check if the top most event is ready to fire.
151  void checkEvents();
152 
153  bool isQueueEmpty() const
154  {
155  UT_AutoLock lock(myLock);
156  return myQueue.entries() <= 0;
157  }
158  bool isRunning() const { return myState == State::kRunning; }
159  bool isPaused() const { return myState == State::kPaused; }
160 
161 private:
162  enum class State
163  {
164  kStopped,
165  kRunning,
166  kPaused,
167  kStopping
168  };
169 
170  static void *threadCB(void *data);
171  // Insert an event
172  EventId insert(DurationT t, Callback func, bool run_on_mt);
173  EventId insert(const TimePoint &tp, Callback func, bool run_on_mt);
174 
175  void notifyReady();
176  bool internalWaitForEvents();
177  void internalProcessEvents();
178  bool getNextEvent(Event &e);
179 
180  mutable UT_Lock myLock;
181  EventId myIdCounter;
182  // We need the ability to cancel callbacks which means we need to be able
183  // to iterate over the queue. Priority queues sadly do not allow for
184  // iterating over them
185  UT_Array<Event> myQueue;
186  State myState;
187 
188  mutable UT_Lock myBackendLock;
189  UT_Condition myConVar;
190  bool myReady;
191  bool myIsWorking;
192 
193  int myMaxEventsToProcess;
194 
195  UT_Array<Event> myWaitingEvents;
196  UT_Thread *myBackgroundThread;
197 };
198 
199 template <typename ClockT, typename DurationT>
201  : myLock()
202  , myBackendLock()
203  , myIdCounter(0)
204  , myQueue()
205  , myBackgroundThread(nullptr)
206  , myState(State::kStopped)
207  , myWaitingEvents()
208  , myReady(false)
209  , myIsWorking(false)
210  , myMaxEventsToProcess(1000)
211 {
212 }
213 
214 template <typename ClockT, typename DurationT>
216 {
217  stop();
218 }
219 
220 template <typename ClockT, typename DurationT>
221 void
223 {
224  UT_AutoLock lock(myBackendLock);
225  if (!myReady)
226  {
227  myReady = true;
228  myConVar.triggerOne();
229  }
230 }
231 
232 template <typename ClockT, typename DurationT>
233 void
235 {
236  UT_AutoLock lock(myLock);
237 
238  if (myQueue.isEmpty())
239  return;
240 
241  if (myQueue.begin()->isReadyToFire())
242  {
243  if (myQueue.begin()->myCallback)
244  {
245  // Check if we are to run this callback on the main thread or
246  // our background thread
247  if (myQueue.begin()->myRunOnMainThread)
248  myQueue.begin()->myCallback();
249  else
250  {
251  {
252  UT_AutoLock lock(myBackendLock);
253  myWaitingEvents.emplace_back(myQueue[0]);
254  }
255  notifyReady();
256  }
257  }
258  std::pop_heap(myQueue.begin(), myQueue.end(), Comparator{});
259  myQueue.removeLast();
260  }
261 }
262 
263 template <typename ClockT, typename DurationT>
264 bool
266 {
267  UT_AutoLock lock(myLock);
268 
269  if (myQueue.entries() <= 0)
270  return false;
271 
272  // check the first element and see if we can pop the heap
273  if (myQueue.begin()->myId == id)
274  {
275  // Move the first element to the back and then remove it from the queue
276  std::pop_heap(myQueue.begin(), myQueue.end(), Comparator{});
277  myQueue.removeLast();
278  return true;
279  }
280 
281  for (exint i = myQueue.entries() - 1; i-- > 1;)
282  {
283  if (myQueue[i].myId == id)
284  {
285  myQueue.removeIndex(i);
286  std::make_heap(myQueue.begin(), myQueue.end(), Comparator{});
287  return true;
288  }
289  }
290 
291  return false;
292 }
293 
294 template <typename ClockT, typename DurationT>
297  DurationT t,
298  Callback func,
299  bool run_on_mt)
300 {
301  TimePoint fst = std::chrono::time_point_cast<DurationT>(ClockT::now());
302  fst += t;
303 
304  return insert(fst, func, run_on_mt);
305 }
306 
307 template <typename ClockT, typename DurationT>
310  const TimePoint &tp,
311  Callback func,
312  bool run_on_mt)
313 {
314  UT_AutoLock lock(myLock);
315  EventId id = myIdCounter++;
316 
317  myQueue.emplace_back(id, tp, func, run_on_mt);
318  std::push_heap(myQueue.begin(), myQueue.end(), Comparator{});
319 
320  return id;
321 }
322 
323 template <typename ClockT, typename DurationT>
324 void *
326 {
327  NET_Scheduler *scheduler = static_cast<NET_Scheduler *>(data);
328  while (scheduler->isRunning() || scheduler->isPaused())
329  {
330  if (scheduler->internalWaitForEvents() && scheduler->isRunning())
331  {
332  scheduler->myIsWorking = true;
333  scheduler->internalProcessEvents();
334  }
335  scheduler->myIsWorking = false;
336  }
337 
338  return nullptr;
339 }
340 
341 template <typename ClockT, typename DurationT>
342 bool
344 {
345  UT_AutoLock lock(myBackendLock);
346  if (!myReady)
347  {
348  myConVar.waitForTrigger(myBackendLock);
349  if (!myReady)
350  return false;
351  myReady = false;
352  }
353  return true;
354 }
355 
356 template <typename ClockT, typename DurationT>
357 void
359 {
360  for (unsigned i = 0; i < myMaxEventsToProcess && isRunning(); ++i)
361  {
362  Event e;
363  if (!getNextEvent(e))
364  {
365  // If we didnt get a new work item then check if we should set
366  // ourselves as not ready. If we dont then its possible to get
367  // into a state where we are always ready and never waiting for
368  // a trigger causing our CPU usage to skyrocket.
369  UT_AutoLock lock(myBackendLock);
370  if (myWaitingEvents.size() <= 0)
371  {
372  myReady = false;
373  }
374  break;
375  }
376  if (e.myCallback)
377  e.myCallback();
378  }
379 }
380 
381 template <typename ClockT, typename DurationT>
382 bool
385 {
386  UT_AutoLock lock(myBackendLock);
387  if (myWaitingEvents.size() > 0)
388  {
389  e = myWaitingEvents[0];
390  myWaitingEvents.removeIndex(0);
391  return true;
392  }
393 
394  return false;
395 }
396 
397 template <typename ClockT, typename DurationT>
398 void
400 {
401  if (isRunning() || isPaused())
402  return;
403 
404  myState = State::kRunning;
406  myBackgroundThread->startThread(threadCB, this);
407 }
408 
409 template <typename ClockT, typename DurationT>
410 void
412 {
413  if (myState == State::kStopped || myState == State::kStopping)
414  return;
415  myState = State::kStopping;
416 
417  {
418  UT_AutoLock lock(myBackendLock);
419  myWaitingEvents.clear();
420  }
421 
422  notifyReady();
423 
424  if (myBackgroundThread != nullptr)
425  {
426  myBackgroundThread->waitForState(UT_Thread::ThreadIdle);
427  delete myBackgroundThread;
428  myBackgroundThread = nullptr;
429  }
430 
431  myState = State::kStopped;
432 }
433 
434 template <typename ClockT, typename DurationT>
435 void
437 {
438  if (myState != State::kRunning)
439  return;
440 
441  {
442  UT_AutoLock lock(myBackendLock);
443  myReady = false;
444  myState = State::kPaused;
445  myConVar.triggerOne();
446  }
447 
448  time_t t = time(NULL);
449  // Wait for us to finish working.
450  while (myIsWorking)
451  {
452  // Wait a maximum amount of 5 seconds before exit.
453  if (time(NULL) - t > 5)
454  break;
455  UTnap(30);
456  }
457 }
458 
459 template <typename ClockT, typename DurationT>
460 void
462 {
463  if (myState != State::kPaused)
464  return;
465 
466  {
467  UT_AutoLock lock(myBackendLock);
468  myState = State::kRunning;
469  }
470  notifyReady();
471 }
472 
473 using NET_WebScheduler =
475 
476 #endif // __NET_WEBSCHEDULER_H__
477 
bool remove(EventId id)
vbool4 insert(const vbool4 &a, bool val)
Helper: substitute val for a[i].
Definition: simd.h:3340
bool isRunning() const
GLuint id
Definition: glew.h:1679
GT_API const UT_StringHolder time
bool isReadyToFire() const
bool operator<(const Event &rhs) const
int64 exint
Definition: SYS_Types.h:125
EventId runIn(const T &t, Callback func, bool run_on_main_thread=true)
Condition synchronization primitive.
Definition: UT_Condition.h:25
EventId run(Callback func, bool run_on_main_thread=true)
bool operator>(const Event &rhs) const
GLint GLenum GLsizei GLint GLsizei const void * data
Definition: glew.h:1379
typedef int(WINAPI *PFNWGLRELEASEPBUFFERDCARBPROC)(HPBUFFERARB hPbuffer
bool isQueueEmpty() const
UT_Functor< void > Callback
GLenum func
Definition: glcorearb.h:782
bool isPaused() const
std::chrono::time_point< std::chrono::system_clock, std::chrono::milliseconds > TimePoint
EventId runEvery(const T &t, Callback func, bool run_on_main_thread=true)
DurationT fireIn() const
DurationT elapsedTimeToNextEvent() const
Get the amount of time until next event is ready.
static UT_Thread * allocThread(SpinMode spin_mode, bool uses_tbb=true)
EventId runAt(const TimePoint &tp, Callback func, bool run_on_main_thread=true)
GLdouble GLdouble t
Definition: glew.h:1398
Event(EventId id, const TimePoint &et, Callback clb, bool run_on_mt)