HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
thread.h
Go to the documentation of this file.
1 // Copyright Contributors to the OpenImageIO project.
2 // SPDX-License-Identifier: Apache-2.0
3 // https://github.com/AcademySoftwareFoundation/OpenImageIO
4 
5 // clang-format off
6 
7 /////////////////////////////////////////////////////////////////////////
8 /// @file thread.h
9 ///
10 /// @brief Wrappers and utilities for multithreading.
11 /////////////////////////////////////////////////////////////////////////
12 
13 
14 #pragma once
15 
16 #include <algorithm>
17 #include <atomic>
18 #include <chrono>
19 #include <functional>
20 #include <future>
21 #include <iostream>
22 #include <memory>
23 #include <mutex>
24 #include <thread>
25 #include <vector>
26 
27 #include <OpenImageIO/atomic.h>
28 #include <OpenImageIO/dassert.h>
29 #include <OpenImageIO/export.h>
31 #include <OpenImageIO/platform.h>
32 
33 
34 
35 // OIIO_THREAD_ALLOW_DCLP, if set to 0, prevents us from using a dodgy
36 // "double checked lock pattern" (DCLP). We are very careful to construct
37 // it safely and correctly, and these uses improve thread performance for
38 // us. But it confuses Thread Sanitizer, so this switch allows you to turn
39 // it off. Also set to 0 if you don't believe that we are correct in
40 // allowing this construct on all platforms.
41 #ifndef OIIO_THREAD_ALLOW_DCLP
42 # define OIIO_THREAD_ALLOW_DCLP 1
43 #endif
44 
45 
46 
47 // Some helpful links:
48 //
49 // Descriptions of the "new" gcc atomic intrinsics:
50 // https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html
51 // Old gcc atomic intrinsics:
52 // https://gcc.gnu.org/onlinedocs/gcc-4.4.2/gcc/Atomic-Builtins.html
53 // C++11 and beyond std::atomic:
54 // http://en.cppreference.com/w/cpp/atomic
55 
56 
57 
59 
60 /// Null mutex that can be substituted for a real one to test how much
61 /// overhead is associated with a particular mutex.
62 class null_mutex {
63 public:
64  null_mutex() noexcept {}
65  ~null_mutex() noexcept {}
66  void lock() noexcept {}
67  void unlock() noexcept {}
68  void lock_shared() noexcept {}
69  void unlock_shared() noexcept {}
70  bool try_lock() noexcept { return true; }
71 };
72 
73 /// Null lock that can be substituted for a real one to test how much
74 /// overhead is associated with a particular lock.
75 template<typename T> class null_lock {
76 public:
77  null_lock(T& /*m*/) noexcept {}
78 };
79 
80 
81 
82 using std::mutex;
83 using std::recursive_mutex;
84 using std::thread;
85 typedef std::lock_guard<mutex> lock_guard;
86 typedef std::lock_guard<recursive_mutex> recursive_lock_guard;
87 typedef std::lock_guard<std::recursive_timed_mutex> recursive_timed_lock_guard;
88 
89 
90 
91 /// Yield the processor for the rest of the timeslice.
92 /// DEPRECATED(2.4): Use std::this_thread::yield() instead.
93 inline void
94 yield() noexcept
95 {
97 }
98 
99 
100 
101 // Slight pause
102 inline void
103 pause(int delay) noexcept
104 {
105 #if defined(__GNUC__) && (defined(__x86_64__) || defined(__i386__))
106  for (int i = 0; i < delay; ++i)
107  __asm__ __volatile__("pause;");
108 
109 #elif defined(__GNUC__) && (defined(__arm__) || defined(__s390__))
110  for (int i = 0; i < delay; ++i)
111  __asm__ __volatile__("NOP;");
112 
113 #elif defined(_MSC_VER)
114  for (int i = 0; i < delay; ++i) {
115  // A reimplementation of winnt.h YieldProcessor,
116  // to avoid including windows headers.
117  #if defined(_M_AMD64)
118  _mm_pause();
119  #elif defined(_M_ARM64) || defined(_M_HYBRID_X86_ARM64)
120  __dmb(_ARM64_BARRIER_ISHST);
121  __yield();
122  #elif defined(_M_ARM)
123  __dmb(_ARM_BARRIER_ISHST);
124  __yield();
125  #else
126  _asm pause
127  #endif
128  }
129 
130 #else
131  // No pause on this platform, just punt
132  for (int i = 0; i < delay; ++i)
133  ;
134 #endif
135 }
136 
137 
138 
139 // Helper class to deliver ever longer pauses until we yield our timeslice.
141 public:
142  atomic_backoff(int pausemax = 16) noexcept
143  : m_count(1)
144  , m_pausemax(pausemax)
145  {
146  }
147 
148  void operator()() noexcept
149  {
150  if (m_count <= m_pausemax) {
151  pause(m_count);
152  m_count *= 2;
153  } else {
155  }
156  }
157 
158 private:
159  int m_count;
160  int m_pausemax;
161 };
162 
163 
164 
165 
166 /// A spin_mutex is semantically equivalent to a regular mutex, except
167 /// for the following:
168 /// - A spin_mutex is just 1 byte, whereas a regular mutex is quite
169 /// large (44 bytes for pthread).
170 /// - A spin_mutex is extremely fast to lock and unlock, whereas a regular
171 /// mutex is surprisingly expensive just to acquire a lock.
172 /// - A spin_mutex takes CPU while it waits, so this can be very
173 /// wasteful compared to a regular mutex that blocks (gives up its
174 /// CPU slices until it acquires the lock).
175 ///
176 /// The bottom line is that mutex is the usual choice, but in cases where
177 /// you need to acquire locks very frequently, but only need to hold the
178 /// lock for a very short period of time, you may save runtime by using
179 /// a spin_mutex, even though it's non-blocking.
180 ///
181 /// N.B. A spin_mutex is only the size of a bool. To avoid "false
182 /// sharing", be careful not to put two spin_mutex objects on the same
183 /// cache line (within 128 bytes of each other), or the two mutexes may
184 /// effectively (and wastefully) lock against each other.
185 ///
186 class spin_mutex {
187 public:
188  spin_mutex(void) noexcept {}
189  ~spin_mutex(void) noexcept {}
190 
191  /// Copy constructor -- initialize to unlocked.
192  ///
193  spin_mutex(const spin_mutex&) noexcept {}
194 
195  /// Assignment does not do anything, since lockedness should not
196  /// transfer.
197  const spin_mutex& operator=(const spin_mutex&) noexcept { return *this; }
198 
199  /// Acquire the lock, spin until we have it.
200  ///
201  void lock() noexcept
202  {
203  // To avoid spinning too tightly, we use the atomic_backoff to
204  // provide increasingly longer pauses, and if the lock is under
205  // lots of contention, eventually yield the timeslice.
206  atomic_backoff backoff;
207 
208  // Try to get ownership of the lock. Though experimentation, we
209  // found that OIIO_UNLIKELY makes this just a bit faster on gcc
210  // x86/x86_64 systems.
211  while (!OIIO_UNLIKELY(try_lock())) {
212 #if OIIO_THREAD_ALLOW_DCLP
213  // The full try_lock() involves a test_and_set, which
214  // writes memory, and that will lock the bus. But a normal
215  // read of m_locked will let us spin until the value
216  // changes, without locking the bus. So it's faster to
217  // check in this manner until the mutex appears to be free.
218  // HOWEVER... Thread Sanitizer things this is an instance of
219  // an unsafe "double checked lock pattern" (DCLP) and flags it
220  // as an error. I think it's a false negative, because the
221  // outer loop is still an atomic check, the inner non-atomic
222  // loop only serves to delay, and can't lead to a true data
223  // race. But we provide this build-time switch to, at least,
224  // give a way to use tsan for other checks.
225  do {
226  backoff();
227  } while (*(volatile bool*)&m_locked);
228 #else
229  backoff();
230 #endif
231  }
232  }
233 
234  /// Release the lock that we hold.
235  ///
236  void unlock() noexcept
237  {
238  // Fastest way to do it is with a clear with "release" semantics
239  m_locked.clear(std::memory_order_release);
240  }
241 
242  /// Try to acquire the lock. Return true if we have it, false if
243  /// somebody else is holding the lock.
244  bool try_lock() noexcept
245  {
246  return !m_locked.test_and_set(std::memory_order_acquire);
247  }
248 
249  /// Helper class: scoped lock for a spin_mutex -- grabs the lock upon
250  /// construction, releases the lock when it exits scope.
251  class lock_guard {
252  public:
253  lock_guard(spin_mutex& fm) noexcept
254  : m_fm(fm)
255  {
256  m_fm.lock();
257  }
258  ~lock_guard() noexcept { m_fm.unlock(); }
259 
260  private:
261  lock_guard() = delete;
262  lock_guard(const lock_guard& other) = delete;
263  lock_guard& operator=(const lock_guard& other) = delete;
264  spin_mutex& m_fm;
265  };
266 
267 private:
268  std::atomic_flag m_locked = ATOMIC_FLAG_INIT; // initialize to unlocked
269 };
270 
271 
273 
274 
275 
276 #if 0
277 
278 // OLD CODE vvvvvvvv
279 
280 
281 /// Spinning reader/writer mutex. This is just like spin_mutex, except
282 /// that there are separate locking mechanisms for "writers" (exclusive
283 /// holders of the lock, presumably because they are modifying whatever
284 /// the lock is protecting) and "readers" (non-exclusive, non-modifying
285 /// tasks that may access the protectee simultaneously).
286 class spin_rw_mutex {
287 public:
288  /// Default constructor -- initialize to unlocked.
289  ///
290  spin_rw_mutex (void) { m_readers = 0; }
291 
292  ~spin_rw_mutex (void) { }
293 
294  /// Copy constructor -- initialize to unlocked.
295  ///
296  spin_rw_mutex (const spin_rw_mutex &) { m_readers = 0; }
297 
298  /// Assignment does not do anything, since lockedness should not
299  /// transfer.
300  const spin_rw_mutex& operator= (const spin_rw_mutex&) { return *this; }
301 
302  /// Acquire the reader lock.
303  ///
304  void read_lock () {
305  // Spin until there are no writers active
306  m_locked.lock();
307  // Register ourself as a reader
308  ++m_readers;
309  // Release the lock, to let other readers work
310  m_locked.unlock();
311  }
312 
313  /// Release the reader lock.
314  ///
315  void read_unlock () {
316  --m_readers; // it's atomic, no need to lock to release
317  }
318 
319  /// Acquire the writer lock.
320  ///
321  void write_lock () {
322  // Make sure no new readers (or writers) can start
323  m_locked.lock();
324  // Spin until the last reader is done, at which point we will be
325  // the sole owners and nobody else (reader or writer) can acquire
326  // the resource until we release it.
327 #if OIIO_THREAD_ALLOW_DCLP
328  while (*(volatile int *)&m_readers > 0)
329  ;
330 #else
331  while (m_readers > 0)
332  ;
333 #endif
334  }
335 
336  /// Release the writer lock.
337  ///
338  void write_unlock () {
339  // Let other readers or writers get the lock
340  m_locked.unlock ();
341  }
342 
343  /// Acquire an exclusive ("writer") lock.
344  void lock () { write_lock(); }
345 
346  /// Release an exclusive ("writer") lock.
347  void unlock () { write_unlock(); }
348 
349  /// Acquire a shared ("reader") lock.
350  void lock_shared () { read_lock(); }
351 
352  /// Release a shared ("reader") lock.
353  void unlock_shared () { read_unlock(); }
354 
355  /// Helper class: scoped read lock for a spin_rw_mutex -- grabs the
356  /// read lock upon construction, releases the lock when it exits scope.
357  class read_lock_guard {
358  public:
359  read_lock_guard (spin_rw_mutex &fm) : m_fm(fm) { m_fm.read_lock(); }
360  ~read_lock_guard () { m_fm.read_unlock(); }
361  private:
362  read_lock_guard(); // Do not implement
363  read_lock_guard(const read_lock_guard& other); // Do not implement
364  read_lock_guard& operator = (const read_lock_guard& other); // Do not implement
365  spin_rw_mutex & m_fm;
366  };
367 
368  /// Helper class: scoped write lock for a spin_rw_mutex -- grabs the
369  /// read lock upon construction, releases the lock when it exits scope.
370  class write_lock_guard {
371  public:
372  write_lock_guard (spin_rw_mutex &fm) : m_fm(fm) { m_fm.write_lock(); }
373  ~write_lock_guard () { m_fm.write_unlock(); }
374  private:
375  write_lock_guard(); // Do not implement
376  write_lock_guard(const write_lock_guard& other); // Do not implement
377  write_lock_guard& operator = (const write_lock_guard& other); // Do not implement
378  spin_rw_mutex & m_fm;
379  };
380 
381 private:
383  spin_mutex m_locked; // write lock
384  char pad1_[OIIO_CACHE_LINE_SIZE-sizeof(spin_mutex)];
386  atomic_int m_readers; // number of readers
387  char pad2_[OIIO_CACHE_LINE_SIZE-sizeof(atomic_int)];
388 };
389 
390 
391 #else
392 
393 // vvv New spin rw lock Oct 2017
394 
395 /// Spinning reader/writer mutex. This is just like spin_mutex, except
396 /// that there are separate locking mechanisms for "writers" (exclusive
397 /// holders of the lock, presumably because they are modifying whatever
398 /// the lock is protecting) and "readers" (non-exclusive, non-modifying
399 /// tasks that may access the protectee simultaneously).
401 public:
402  /// Default constructor -- initialize to unlocked.
403  ///
404  spin_rw_mutex() noexcept {}
405 
406  ~spin_rw_mutex() noexcept {}
407 
408  // Do not allow copy or assignment.
409  spin_rw_mutex(const spin_rw_mutex&) = delete;
410  const spin_rw_mutex& operator=(const spin_rw_mutex&) = delete;
411 
412  /// Acquire the reader lock.
413  ///
414  void read_lock() noexcept
415  {
416  // first increase the readers, and if it turned out nobody was
417  // writing, we're done. This means that acquiring a read when nobody
418  // is writing is a single atomic operation.
419  int oldval = m_bits.fetch_add(1, std::memory_order_acquire);
420  if (!(oldval & WRITER))
421  return;
422  // Oops, we incremented readers but somebody was writing. Backtrack
423  // by subtracting, and do things the hard way.
424  int expected = (--m_bits) & NOTWRITER;
425 
426  // Do compare-and-exchange until we can increase the number of
427  // readers by one and have no writers.
428  if (m_bits.compare_exchange_weak(expected, expected + 1,
429  std::memory_order_acquire))
430  return;
431  atomic_backoff backoff;
432  do {
433  backoff();
434  expected = m_bits.load() & NOTWRITER;
435  } while (!m_bits.compare_exchange_weak(expected, expected + 1,
436  std::memory_order_acquire));
437  }
438 
439  /// Release the reader lock.
440  ///
441  void read_unlock() noexcept
442  {
443  // Atomically reduce the number of readers. It's at least 1,
444  // and the WRITER bit should definitely not be set, so this just
445  // boils down to an atomic decrement of m_bits.
446  m_bits.fetch_sub(1, std::memory_order_release);
447  }
448 
449  /// Acquire the writer lock.
450  ///
451  void write_lock() noexcept
452  {
453  // Do compare-and-exchange until we have just ourselves as writer
454  int expected = 0;
455  if (m_bits.compare_exchange_weak(expected, WRITER,
456  std::memory_order_acquire))
457  return;
458  atomic_backoff backoff;
459  do {
460  backoff();
461  expected = 0;
462  } while (!m_bits.compare_exchange_weak(expected, WRITER,
463  std::memory_order_acquire));
464  }
465 
466  /// Release the writer lock.
467  ///
468  void write_unlock() noexcept
469  {
470  // Remove the writer bit
471  m_bits.fetch_sub(WRITER, std::memory_order_release);
472  }
473 
474  /// lock() is a synonym for exclusive (write) lock.
475  void lock() { write_lock(); }
476 
477  /// unlock() is a synonym for exclusive (write) unlock.
478  void unlock() { write_unlock(); }
479 
480  /// Helper class: scoped read lock for a spin_rw_mutex -- grabs the
481  /// read lock upon construction, releases the lock when it exits scope.
483  public:
484  read_lock_guard (spin_rw_mutex &fm) noexcept : m_fm(fm) { m_fm.read_lock(); }
485  ~read_lock_guard () noexcept { m_fm.read_unlock(); }
486  private:
487  read_lock_guard(const read_lock_guard& other) = delete;
488  read_lock_guard& operator = (const read_lock_guard& other) = delete;
489  spin_rw_mutex & m_fm;
490  };
491 
492  /// Helper class: scoped write lock for a spin_rw_mutex -- grabs the
493  /// read lock upon construction, releases the lock when it exits scope.
495  public:
496  write_lock_guard (spin_rw_mutex &fm) noexcept : m_fm(fm) { m_fm.write_lock(); }
497  ~write_lock_guard () noexcept { m_fm.write_unlock(); }
498  private:
499  write_lock_guard(const write_lock_guard& other) = delete;
500  write_lock_guard& operator = (const write_lock_guard& other) = delete;
501  spin_rw_mutex & m_fm;
502  };
503 
504 private:
505  // Use one word to hold the reader count, with a high bit indicating
506  // that it's locked for writing. This will only work if we have
507  // fewer than 2^30 simultaneous readers. I think that should hold
508  // us for some time.
509  enum { WRITER = 1<<30, NOTWRITER = WRITER-1 };
510  std::atomic<int> m_bits { 0 };
511 };
512 
513 #endif
514 
515 
518 
519 
520 
521 /// Mutex pool. Sometimes, we have lots of objects that need to be
522 /// individually locked for thread safety, but two separate objects don't
523 /// need to lock against each other. If there are many more objects than
524 /// threads, it's wasteful for each object to contain its own mutex. So a
525 /// solution is to make a mutex_pool -- a collection of several mutexes.
526 /// Each object uses a hash to choose a consistent mutex for itself, but
527 /// which will be unlikely to be locked simultaneously by different object.
528 /// Semantically, it looks rather like an associative array of mutexes. We
529 /// also ensure that the mutexes are all on different cache lines, to ensure
530 /// that they don't exhibit false sharing. Try to choose Bins larger than
531 /// the expected number of threads that will be simultaneously locking
532 /// mutexes.
533 template<class Mutex, class Key, class Hash, size_t Bins = 16>
534 class mutex_pool {
535 public:
536  mutex_pool() noexcept {}
537  Mutex& operator[](const Key& key) noexcept { return m_mutex[m_hash(key) % Bins].m; }
538 
539 private:
540  // Helper type -- force cache line alignment. This should make an array
541  // of these also have padding so that each individual mutex is aligned
542  // to its own cache line, thus eliminating any "false sharing."
543  struct AlignedMutex {
545  };
546 
547  AlignedMutex m_mutex[Bins];
548  Hash m_hash;
549 };
550 
551 
552 
553 /// Simple thread group class: lets you spawn a group of new threads,
554 /// then wait for them to all complete.
556 public:
559 
561  {
562  if (t) {
563  lock_guard lock(m_mutex);
564  m_threads.emplace_back(t);
565  }
566  }
567 
568  template<typename FUNC, typename... Args>
569  thread* create_thread(FUNC func, Args&&... args)
570  {
571  thread* t = new thread(func, std::forward<Args>(args)...);
572  add_thread(t);
573  return t;
574  }
575 
576  void join_all()
577  {
578  lock_guard lock(m_mutex);
579  for (auto& t : m_threads)
580  if (t->joinable())
581  t->join();
582  }
583 
584  size_t size() const
585  {
586  lock_guard lock(m_mutex);
587  return m_threads.size();
588  }
589 
590 private:
591  mutable mutex m_mutex;
592  std::vector<std::unique_ptr<thread>> m_threads;
593 };
594 
595 
596 
597 /// thread_pool is a persistent set of threads watching a queue to which
598 /// tasks can be submitted.
599 ///
600 /// Call default_thread_pool() to retrieve a pointer to a single shared
601 /// thread_pool that will be initialized the first time it's needed, running
602 /// a number of threads corresponding to the number of cores on the machine.
603 ///
604 /// It's possible to create other pools, but it's not something that's
605 /// recommended unless you really know what you're doing and are careful
606 /// that the sum of threads across all pools doesn't cause you to be highly
607 /// over-threaded. An example of when this might be useful is if you want
608 /// one pool of 4 threads to handle I/O without interference from a separate
609 /// pool of 4 (other) threads handling computation.
610 ///
611 /// Submitting an asynchronous task to the queue follows the following
612 /// pattern:
613 ///
614 /// /* func that takes a thread ID followed possibly by more args */
615 /// result_t my_func (int thread_id, Arg1 arg1, ...) { }
616 /// pool->push (my_func, arg1, ...);
617 ///
618 /// If you just want to "fire and forget", then:
619 ///
620 /// pool->push (func, ...args...);
621 ///
622 /// But if you need a result, or simply need to know when the task has
623 /// completed, note that the push() method will return a future<result_t>
624 /// that you can check, like this:
625 ///
626 /// std::future<result_t> f = pool->push (my_task);
627 ///
628 /// And then you can
629 ///
630 /// find out if it's done: if (f.valid()) ...
631 /// wait for it to get done: f.wait();
632 /// get result (waiting if necessary): result_t r = f.get();
633 ///
634 /// A common idiom is to fire a bunch of sub-tasks at the queue, and then
635 /// wait for them to all complete. We provide a helper class, task_set,
636 /// to make this easy:
637 ///
638 /// task_set tasks (pool);
639 /// for (int i = 0; i < n_subtasks; ++i)
640 /// tasks.push (pool->push (myfunc));
641 /// tasks.wait ();
642 ///
643 /// Note that the tasks.wait() is optional -- it will be called
644 /// automatically when the task_set exits its scope.
645 ///
646 /// The task function's first argument, the thread_id, is the thread number
647 /// for the pool, or -1 if it's being executed by a non-pool thread (this
648 /// can happen in cases where the whole pool is occupied and the calling
649 /// thread contributes to running the work load).
650 ///
651 /// Thread pool. Have fun, be safe.
652 ///
654 public:
655  /// Initialize the pool. This implicitly calls resize() to set the
656  /// number of worker threads, defaulting to a number of workers that is
657  /// one less than the number of hardware cores.
658  thread_pool(int nthreads = -1);
659  ~thread_pool();
660 
661  /// How many threads are in the pool?
662  int size() const;
663 
664  /// Sets the number of worker threads in the pool. If the pool size is
665  /// 0, any tasks added to the pool will be executed immediately by the
666  /// calling thread. Requesting nthreads < 0 will cause it to resize to
667  /// the number of hardware cores minus one (one less, to account for the
668  /// fact that the calling thread will also contribute). BEWARE! Resizing
669  /// the queue should not be done while jobs are running.
670  void resize(int nthreads = -1);
671 
672  /// Return the number of currently idle threads in the queue. Zero
673  /// means the queue is fully engaged.
674  int idle() const;
675 
676  /// Run the user's function that accepts argument int - id of the
677  /// running thread. The returned value is templatized std::future, where
678  /// the user can get the result and rethrow any exceptions. If the queue
679  /// has no worker threads, the task will be run immediately by the
680  /// calling thread.
681  template<typename F> auto push(F&& f) -> std::future<decltype(f(0))>
682  {
683  auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(
684  std::forward<F>(f));
685  if (size() < 1) {
686  (*pck)(-1); // No worker threads, run it with the calling thread
687  } else {
688  auto _f = new std::function<void(int id)>(
689  [pck](int id) { (*pck)(id); });
690  push_queue_and_notify(_f);
691  }
692  return pck->get_future();
693  }
694 
695  /// Run the user's function that accepts an arbitrary set of arguments
696  /// (also passed). The returned value is templatized std::future, where
697  /// the user can get the result and rethrow any exceptions. If the queue
698  /// has no worker threads, the task will be run immediately by the
699  /// calling thread.
700  template<typename F, typename... Rest>
701  auto push (F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
702  auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
703  std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
704  );
705  if (size() < 1) {
706  (*pck)(-1); // No worker threads, run it with the calling thread
707  } else {
708  auto _f = new std::function<void(int id)>([pck](int id) {
709  (*pck)(id);
710  });
711  push_queue_and_notify (_f);
712  }
713  return pck->get_future();
714  }
715 
716  /// If there are any tasks on the queue, pull one off and run it (on
717  /// this calling thread) and return true. Otherwise (there are no
718  /// pending jobs), return false immediately. This utility is what makes
719  /// it possible for non-pool threads to also run tasks from the queue
720  /// when they would ordinarily be idle. The thread id of the caller
721  /// should be passed.
722  bool run_one_task(std::thread::id id);
723 
724  /// Return true if the calling thread is part of the thread pool. This
725  /// can be used to limit a pool thread from unadvisedly adding its own
726  /// subtasks to clog up the pool.
727  /// DEPRECATED(2.1) -- use is_worker() instead.
728  bool this_thread_is_in_pool() const;
729 
730  /// Register a thread (not already in the thread pool itself) as working
731  /// on tasks in the pool. This is used to avoid recursion.
732  void register_worker(std::thread::id id);
733  /// De-register a thread, saying it is no longer in the process of
734  /// taking work from the thread pool.
735  void deregister_worker(std::thread::id id);
736  /// Is the thread in the pool or currently engaged in taking tasks from
737  /// the pool?
738  bool is_worker(std::thread::id id) const;
739  bool is_worker() const { return is_worker(std::this_thread::get_id()); }
740  // Non-const versions: DEPRECATED(2.1)
741  bool is_worker(std::thread::id id);
742 
743  /// How many jobs are waiting to run? (Use with caution! Can be out of
744  /// date by the time you look at it.)
745  size_t jobs_in_queue() const;
746 
747  /// Is the pool very busy? Meaning that there are significantly more
748  /// tasks in the queue waiting to run than there are threads in the
749  /// pool. It may be wise for a caller to check this before submitting
750  /// tasks -- if the queue is very busy, it's probably more expedient to
751  /// execute the code directly rather than add it to an oversubscribed
752  /// queue.
753  bool very_busy() const;
754 
755 private:
756  // Disallow copy construction and assignment
757  thread_pool(const thread_pool&) = delete;
758  thread_pool(thread_pool&&) = delete;
759  thread_pool& operator=(const thread_pool&) = delete;
760  thread_pool& operator=(thread_pool&&) = delete;
761 
762  // PIMPL pattern hides all the guts far away from the public API
763  class Impl;
764  std::unique_ptr<Impl> m_impl;
765 
766  // Utility function that helps us hide the implementation
767  void push_queue_and_notify(std::function<void(int id)>* f);
768 };
769 
770 
771 
772 /// Return a reference to the "default" shared thread pool. In almost all
773 /// ordinary circumstances, you should use this exclusively to get a single
774 /// shared thread pool, since creating multiple thread pools could result in
775 /// hilariously over-threading your application. Note that this call may
776 /// (safely, and only once) trigger creation of the thread pool and its
777 /// worker threads if it has not yet been created.
779 
780 /// If a thread pool has been created, this call will safely terminate its
781 /// worker threads. This should presumably be called by an application
782 /// immediately before it exists, when it is confident the thread pool will
783 /// no longer be needed.
785 
786 
787 
788 /// task_set is a group of future<void>'s from a thread_queue that you can
789 /// add to, and when you either call wait() or just leave the task_set's
790 /// scope, it will wait for all the tasks in the set to be done before
791 /// proceeding.
792 ///
793 /// A typical idiom for using this is:
794 ///
795 /// void myfunc (int id) { ... do something ... }
796 ///
797 /// thread_pool* pool (default_thread_pool());
798 /// {
799 /// task_set tasks (pool);
800 /// // Launch a bunch of tasks into the thread pool
801 /// for (int i = 0; i < ntasks; ++i)
802 /// tasks.push (pool->push (myfunc));
803 /// // The following brace, by ending the scope of 'tasks', will
804 /// // wait for all those queue tasks to finish.
805 /// }
806 ///
808 public:
810  : m_pool(pool ? pool : default_thread_pool())
811  , m_submitter_thread(std::this_thread::get_id())
812  {
813  }
814  ~task_set() { wait(); }
815 
816  task_set(const task_set&) = delete;
817  const task_set& operator=(const task_set&) = delete;
818 
819  // Return the thread id of the thread that set up this task_set and
820  // submitted its tasks to the thread pool.
821  std::thread::id submitter() const { return m_submitter_thread; }
822 
823  // Save a future (presumably returned by a threadpool::push() as part
824  // of this task set.
825  void push(std::future<void>&& f)
826  {
827  OIIO_DASSERT(
828  std::this_thread::get_id() == submitter()
829  && "All tasks in a tast_set should be added by the same thread");
830  m_futures.emplace_back(std::move(f));
831  }
832 
833  // Wait for the given taskindex (0..n-1, where n is the number of tasks
834  // submitted as part of this task_set). If block == true, fully block
835  // while waiting for that task to finish. If block is false, then busy
836  // wait, and opportunistically run queue tasks yourself while you are
837  // waiting for the task to finish.
838  void wait_for_task(size_t taskindex, bool block = false);
839 
840  // Wait for all tasks in the set to finish. If block == true, fully
841  // block while waiting for the pool threads to all finish. If block is
842  // false, then busy wait, and opportunistically run queue tasks yourself
843  // while you are waiting for other tasks to finish.
844  void wait(bool block = false);
845 
846  // Debugging sanity check, called after wait(), to ensure that all the
847  // tasks were completed.
848  void check_done()
849  {
850  const std::chrono::milliseconds wait_time(0);
851  for (auto&& f : m_futures)
852  OIIO_ASSERT(f.wait_for(wait_time) == std::future_status::ready);
853  }
854 
855 private:
856  thread_pool* m_pool;
857  std::thread::id m_submitter_thread;
858  std::vector<std::future<void>> m_futures;
859 };
860 
861 
void lock() noexcept
Definition: thread.h:66
null_mutex() noexcept
Definition: thread.h:64
typedef int(APIENTRYP RE_PFNGLXSWAPINTERVALSGIPROC)(int)
void add_thread(thread *t)
Definition: thread.h:560
#define OIIO_ASSERT(x)
Definition: dassert.h:32
std::lock_guard< mutex > lock_guard
Definition: thread.h:85
OIIO_UTIL_API thread_pool * default_thread_pool()
STATIC_INLINE size_t Hash(const char *s, size_t len)
Definition: farmhash.h:2099
null_lock(T &) noexcept
Definition: thread.h:77
spin_rw_mutex::write_lock_guard spin_rw_write_lock
Definition: thread.h:517
std::mutex Mutex
void unlock() noexcept
Definition: thread.h:236
Mutex & operator[](const Key &key) noexcept
Definition: thread.h:537
spin_mutex::lock_guard spin_lock
Definition: thread.h:272
atomic_backoff(int pausemax=16) noexcept
Definition: thread.h:142
void lock() noexcept
Definition: thread.h:201
#define OIIO_UTIL_API
Definition: export.h:71
thread_group()
Definition: thread.h:557
~spin_mutex(void) noexcept
Definition: thread.h:189
void join_all()
Definition: thread.h:576
void unlock_shared() noexcept
Definition: thread.h:69
bool try_lock() noexcept
Definition: thread.h:70
atomic< int > atomic_int
Definition: atomic.h:25
OIIO_UTIL_API void default_thread_pool_shutdown()
GLfloat f
Definition: glcorearb.h:1926
void unlock()
unlock() is a synonym for exclusive (write) unlock.
Definition: thread.h:478
~null_mutex() noexcept
Definition: thread.h:65
lock_guard(spin_mutex &fm) noexcept
Definition: thread.h:253
write_lock_guard(spin_rw_mutex &fm) noexcept
Definition: thread.h:496
bool try_lock() noexcept
Definition: thread.h:244
void write_unlock() noexcept
Definition: thread.h:468
#define OIIO_DASSERT
Definition: dassert.h:55
bool is_worker() const
Definition: thread.h:739
std::lock_guard< std::recursive_timed_mutex > recursive_timed_lock_guard
Definition: thread.h:87
void read_unlock() noexcept
Definition: thread.h:441
~spin_rw_mutex() noexcept
Definition: thread.h:406
spin_mutex(const spin_mutex &) noexcept
Definition: thread.h:193
GLuint id
Definition: glcorearb.h:655
void operator()() noexcept
Definition: thread.h:148
~task_set()
Definition: thread.h:814
task_set(thread_pool *pool=nullptr)
Definition: thread.h:809
void write_lock() noexcept
Definition: thread.h:451
Wrappers and utilities for atomics.
GLdouble t
Definition: glad.h:2397
mutex_pool() noexcept
Definition: thread.h:536
void lock_shared() noexcept
Definition: thread.h:68
size_t size() const
Definition: thread.h:584
*tasks wait()
~lock_guard() noexcept
Definition: thread.h:258
void unlock() noexcept
Definition: thread.h:67
GLsizeiptr size
Definition: glcorearb.h:664
**Note that the tasks the is the thread number *for the or if it s being executed by a non pool thread(this *can happen in cases where the whole pool is occupied and the calling *thread contributes to running the work load).**Thread pool.Have fun
GLenum func
Definition: glcorearb.h:783
spin_rw_mutex() noexcept
Definition: thread.h:404
#define OIIO_CACHE_ALIGN
Definition: platform.h:369
ImageBuf OIIO_API resize(const ImageBuf &src, string_view filtername="", float filterwidth=0.0f, ROI roi={}, int nthreads=0)
#define OIIO_UNLIKELY(x)
Definition: platform.h:388
void push(std::future< void > &&f)
Definition: thread.h:825
void lock()
lock() is a synonym for exclusive (write) lock.
Definition: thread.h:475
auto push(F &&f, Rest &&...rest) -> std::future< decltype(f(0, rest...))>
Definition: thread.h:701
LeafData & operator=(const LeafData &)=delete
const spin_rw_mutex & operator=(const spin_rw_mutex &)=delete
~thread_group()
Definition: thread.h:558
std::thread::id submitter() const
Definition: thread.h:821
void yield() noexcept
Definition: thread.h:94
std::lock_guard< recursive_mutex > recursive_lock_guard
Definition: thread.h:86
spin_mutex(void) noexcept
Definition: thread.h:188
spin_rw_mutex::read_lock_guard spin_rw_read_lock
Definition: thread.h:516
void pause(int delay) noexcept
Definition: thread.h:103
void read_lock() noexcept
Definition: thread.h:414
**If you just want to fire and args
Definition: thread.h:618
read_lock_guard(spin_rw_mutex &fm) noexcept
Definition: thread.h:484
auto push(F &&f) -> std::future< decltype(f(0))>
Definition: thread.h:681
void check_done()
Definition: thread.h:848
#define OIIO_NAMESPACE_END
Definition: oiioversion.h:127
const spin_mutex & operator=(const spin_mutex &) noexcept
Definition: thread.h:197
GA_API const UT_StringHolder rest
thread * create_thread(FUNC func, Args &&...args)
Definition: thread.h:569
**Note that the tasks the is the thread number *for the pool
Definition: thread.h:646
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the and then *wait for them to all complete We provide a helper task_set
Definition: thread.h:632
#define OIIO_CACHE_LINE_SIZE
Definition: platform.h:366
#define OIIO_NAMESPACE_BEGIN
Definition: oiioversion.h:126