HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
thread.h
Go to the documentation of this file.
1 // Copyright 2008-present Contributors to the OpenImageIO project.
2 // SPDX-License-Identifier: BSD-3-Clause
3 // https://github.com/OpenImageIO/oiio/blob/master/LICENSE.md
4 
5 // clang-format off
6 
7 /////////////////////////////////////////////////////////////////////////
8 /// @file thread.h
9 ///
10 /// @brief Wrappers and utilities for multithreading.
11 /////////////////////////////////////////////////////////////////////////
12 
13 
14 #pragma once
15 
16 #include <algorithm>
17 #include <atomic>
18 #include <chrono>
19 #include <functional>
20 #include <future>
21 #include <iostream>
22 #include <memory>
23 #include <mutex>
24 #include <thread>
25 #include <vector>
26 
27 #include <OpenImageIO/atomic.h>
28 #include <OpenImageIO/dassert.h>
29 #include <OpenImageIO/export.h>
31 #include <OpenImageIO/platform.h>
32 
33 
34 
35 // OIIO_THREAD_ALLOW_DCLP, if set to 0, prevents us from using a dodgy
36 // "double checked lock pattern" (DCLP). We are very careful to construct
37 // it safely and correctly, and these uses improve thread performance for
38 // us. But it confuses Thread Sanitizer, so this switch allows you to turn
39 // it off. Also set to 0 if you don't believe that we are correct in
40 // allowing this construct on all platforms.
41 #ifndef OIIO_THREAD_ALLOW_DCLP
42 # define OIIO_THREAD_ALLOW_DCLP 1
43 #endif
44 
45 
46 
47 // Some helpful links:
48 //
49 // Descriptions of the "new" gcc atomic intrinsics:
50 // https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html
51 // Old gcc atomic intrinsics:
52 // https://gcc.gnu.org/onlinedocs/gcc-4.4.2/gcc/Atomic-Builtins.html
53 // C++11 and beyond std::atomic:
54 // http://en.cppreference.com/w/cpp/atomic
55 
56 
57 
59 
60 /// Null mutex that can be substituted for a real one to test how much
61 /// overhead is associated with a particular mutex.
62 class null_mutex {
63 public:
64  null_mutex() noexcept {}
65  ~null_mutex() noexcept {}
66  void lock() noexcept {}
67  void unlock() noexcept {}
68  void lock_shared() noexcept {}
69  void unlock_shared() noexcept {}
70  bool try_lock() noexcept { return true; }
71 };
72 
73 /// Null lock that can be substituted for a real one to test how much
74 /// overhead is associated with a particular lock.
75 template<typename T> class null_lock {
76 public:
77  null_lock(T& /*m*/) noexcept {}
78 };
79 
80 
81 
82 using std::mutex;
83 using std::recursive_mutex;
84 using std::thread;
85 typedef std::lock_guard<mutex> lock_guard;
86 typedef std::lock_guard<recursive_mutex> recursive_lock_guard;
87 
88 
89 
90 /// Yield the processor for the rest of the timeslice.
91 ///
92 inline void
93 yield() noexcept
94 {
95 #if defined(__GNUC__)
96  sched_yield();
97 #elif defined(_MSC_VER)
98  SwitchToThread();
99 #else
100 # error No yield on this platform.
101 #endif
102 }
103 
104 
105 
106 // Slight pause
107 inline void
108 pause(int delay) noexcept
109 {
110 #if defined(__GNUC__) && (defined(__x86_64__) || defined(__i386__))
111  for (int i = 0; i < delay; ++i)
112  __asm__ __volatile__("pause;");
113 
114 #elif defined(__GNUC__) && (defined(__arm__) || defined(__s390__))
115  for (int i = 0; i < delay; ++i)
116  __asm__ __volatile__("NOP;");
117 
118 #elif defined(_MSC_VER)
119  for (int i = 0; i < delay; ++i) {
120 # if defined(_WIN64)
121  YieldProcessor();
122 # else
123  _asm pause
124 # endif /* _WIN64 */
125  }
126 
127 #else
128  // No pause on this platform, just punt
129  for (int i = 0; i < delay; ++i)
130  ;
131 #endif
132 }
133 
134 
135 
136 // Helper class to deliver ever longer pauses until we yield our timeslice.
138 public:
139  atomic_backoff(int pausemax = 16) noexcept
140  : m_count(1)
141  , m_pausemax(pausemax)
142  {
143  }
144 
145  void operator()() noexcept
146  {
147  if (m_count <= m_pausemax) {
148  pause(m_count);
149  m_count *= 2;
150  } else {
151  yield();
152  }
153  }
154 
155 private:
156  int m_count;
157  int m_pausemax;
158 };
159 
160 
161 
162 
163 /// A spin_mutex is semantically equivalent to a regular mutex, except
164 /// for the following:
165 /// - A spin_mutex is just 1 byte, whereas a regular mutex is quite
166 /// large (44 bytes for pthread).
167 /// - A spin_mutex is extremely fast to lock and unlock, whereas a regular
168 /// mutex is surprisingly expensive just to acquire a lock.
169 /// - A spin_mutex takes CPU while it waits, so this can be very
170 /// wasteful compared to a regular mutex that blocks (gives up its
171 /// CPU slices until it acquires the lock).
172 ///
173 /// The bottom line is that mutex is the usual choice, but in cases where
174 /// you need to acquire locks very frequently, but only need to hold the
175 /// lock for a very short period of time, you may save runtime by using
176 /// a spin_mutex, even though it's non-blocking.
177 ///
178 /// N.B. A spin_mutex is only the size of a bool. To avoid "false
179 /// sharing", be careful not to put two spin_mutex objects on the same
180 /// cache line (within 128 bytes of each other), or the two mutexes may
181 /// effectively (and wastefully) lock against each other.
182 ///
183 class spin_mutex {
184 public:
185  spin_mutex(void) noexcept {}
186  ~spin_mutex(void) noexcept {}
187 
188  /// Copy constructor -- initialize to unlocked.
189  ///
190  spin_mutex(const spin_mutex&) noexcept {}
191 
192  /// Assignment does not do anything, since lockedness should not
193  /// transfer.
194  const spin_mutex& operator=(const spin_mutex&) noexcept { return *this; }
195 
196  /// Acquire the lock, spin until we have it.
197  ///
198  void lock() noexcept
199  {
200  // To avoid spinning too tightly, we use the atomic_backoff to
201  // provide increasingly longer pauses, and if the lock is under
202  // lots of contention, eventually yield the timeslice.
203  atomic_backoff backoff;
204 
205  // Try to get ownership of the lock. Though experimentation, we
206  // found that OIIO_UNLIKELY makes this just a bit faster on gcc
207  // x86/x86_64 systems.
208  while (!OIIO_UNLIKELY(try_lock())) {
209 #if OIIO_THREAD_ALLOW_DCLP
210  // The full try_lock() involves a test_and_set, which
211  // writes memory, and that will lock the bus. But a normal
212  // read of m_locked will let us spin until the value
213  // changes, without locking the bus. So it's faster to
214  // check in this manner until the mutex appears to be free.
215  // HOWEVER... Thread Sanitizer things this is an instance of
216  // an unsafe "double checked lock pattern" (DCLP) and flags it
217  // as an error. I think it's a false negative, because the
218  // outer loop is still an atomic check, the inner non-atomic
219  // loop only serves to delay, and can't lead to a true data
220  // race. But we provide this build-time switch to, at least,
221  // give a way to use tsan for other checks.
222  do {
223  backoff();
224  } while (*(volatile bool*)&m_locked);
225 #else
226  backoff();
227 #endif
228  }
229  }
230 
231  /// Release the lock that we hold.
232  ///
233  void unlock() noexcept
234  {
235  // Fastest way to do it is with a clear with "release" semantics
236  m_locked.clear(std::memory_order_release);
237  }
238 
239  /// Try to acquire the lock. Return true if we have it, false if
240  /// somebody else is holding the lock.
241  bool try_lock() noexcept
242  {
243  return !m_locked.test_and_set(std::memory_order_acquire);
244  }
245 
246  /// Helper class: scoped lock for a spin_mutex -- grabs the lock upon
247  /// construction, releases the lock when it exits scope.
248  class lock_guard {
249  public:
250  lock_guard(spin_mutex& fm) noexcept
251  : m_fm(fm)
252  {
253  m_fm.lock();
254  }
255  ~lock_guard() noexcept { m_fm.unlock(); }
256 
257  private:
258  lock_guard() = delete;
259  lock_guard(const lock_guard& other) = delete;
260  lock_guard& operator=(const lock_guard& other) = delete;
261  spin_mutex& m_fm;
262  };
263 
264 private:
265  std::atomic_flag m_locked = ATOMIC_FLAG_INIT; // initialize to unlocked
266 };
267 
268 
270 
271 
272 
273 #if 0
274 
275 // OLD CODE vvvvvvvv
276 
277 
278 /// Spinning reader/writer mutex. This is just like spin_mutex, except
279 /// that there are separate locking mechanisms for "writers" (exclusive
280 /// holders of the lock, presumably because they are modifying whatever
281 /// the lock is protecting) and "readers" (non-exclusive, non-modifying
282 /// tasks that may access the protectee simultaneously).
283 class spin_rw_mutex {
284 public:
285  /// Default constructor -- initialize to unlocked.
286  ///
287  spin_rw_mutex (void) { m_readers = 0; }
288 
289  ~spin_rw_mutex (void) { }
290 
291  /// Copy constructor -- initialize to unlocked.
292  ///
293  spin_rw_mutex (const spin_rw_mutex &) { m_readers = 0; }
294 
295  /// Assignment does not do anything, since lockedness should not
296  /// transfer.
297  const spin_rw_mutex& operator= (const spin_rw_mutex&) { return *this; }
298 
299  /// Acquire the reader lock.
300  ///
301  void read_lock () {
302  // Spin until there are no writers active
303  m_locked.lock();
304  // Register ourself as a reader
305  ++m_readers;
306  // Release the lock, to let other readers work
307  m_locked.unlock();
308  }
309 
310  /// Release the reader lock.
311  ///
312  void read_unlock () {
313  --m_readers; // it's atomic, no need to lock to release
314  }
315 
316  /// Acquire the writer lock.
317  ///
318  void write_lock () {
319  // Make sure no new readers (or writers) can start
320  m_locked.lock();
321  // Spin until the last reader is done, at which point we will be
322  // the sole owners and nobody else (reader or writer) can acquire
323  // the resource until we release it.
324 #if OIIO_THREAD_ALLOW_DCLP
325  while (*(volatile int *)&m_readers > 0)
326  ;
327 #else
328  while (m_readers > 0)
329  ;
330 #endif
331  }
332 
333  /// Release the writer lock.
334  ///
335  void write_unlock () {
336  // Let other readers or writers get the lock
337  m_locked.unlock ();
338  }
339 
340  /// Acquire an exclusive ("writer") lock.
341  void lock () { write_lock(); }
342 
343  /// Release an exclusive ("writer") lock.
344  void unlock () { write_unlock(); }
345 
346  /// Acquire a shared ("reader") lock.
347  void lock_shared () { read_lock(); }
348 
349  /// Release a shared ("reader") lock.
350  void unlock_shared () { read_unlock(); }
351 
352  /// Helper class: scoped read lock for a spin_rw_mutex -- grabs the
353  /// read lock upon construction, releases the lock when it exits scope.
354  class read_lock_guard {
355  public:
356  read_lock_guard (spin_rw_mutex &fm) : m_fm(fm) { m_fm.read_lock(); }
357  ~read_lock_guard () { m_fm.read_unlock(); }
358  private:
359  read_lock_guard(); // Do not implement
360  read_lock_guard(const read_lock_guard& other); // Do not implement
361  read_lock_guard& operator = (const read_lock_guard& other); // Do not implement
362  spin_rw_mutex & m_fm;
363  };
364 
365  /// Helper class: scoped write lock for a spin_rw_mutex -- grabs the
366  /// read lock upon construction, releases the lock when it exits scope.
367  class write_lock_guard {
368  public:
369  write_lock_guard (spin_rw_mutex &fm) : m_fm(fm) { m_fm.write_lock(); }
370  ~write_lock_guard () { m_fm.write_unlock(); }
371  private:
372  write_lock_guard(); // Do not implement
373  write_lock_guard(const write_lock_guard& other); // Do not implement
374  write_lock_guard& operator = (const write_lock_guard& other); // Do not implement
375  spin_rw_mutex & m_fm;
376  };
377 
378 private:
380  spin_mutex m_locked; // write lock
381  char pad1_[OIIO_CACHE_LINE_SIZE-sizeof(spin_mutex)];
383  atomic_int m_readers; // number of readers
384  char pad2_[OIIO_CACHE_LINE_SIZE-sizeof(atomic_int)];
385 };
386 
387 
388 #else
389 
390 // vvv New spin rw lock Oct 2017
391 
392 /// Spinning reader/writer mutex. This is just like spin_mutex, except
393 /// that there are separate locking mechanisms for "writers" (exclusive
394 /// holders of the lock, presumably because they are modifying whatever
395 /// the lock is protecting) and "readers" (non-exclusive, non-modifying
396 /// tasks that may access the protectee simultaneously).
398 public:
399  /// Default constructor -- initialize to unlocked.
400  ///
401  spin_rw_mutex() noexcept {}
402 
403  ~spin_rw_mutex() noexcept {}
404 
405  // Do not allow copy or assignment.
406  spin_rw_mutex(const spin_rw_mutex&) = delete;
407  const spin_rw_mutex& operator=(const spin_rw_mutex&) = delete;
408 
409  /// Acquire the reader lock.
410  ///
411  void read_lock() noexcept
412  {
413  // first increase the readers, and if it turned out nobody was
414  // writing, we're done. This means that acquiring a read when nobody
415  // is writing is a single atomic operation.
416  int oldval = m_bits.fetch_add(1, std::memory_order_acquire);
417  if (!(oldval & WRITER))
418  return;
419  // Oops, we incremented readers but somebody was writing. Backtrack
420  // by subtracting, and do things the hard way.
421  int expected = (--m_bits) & NOTWRITER;
422 
423  // Do compare-and-exchange until we can increase the number of
424  // readers by one and have no writers.
425  if (m_bits.compare_exchange_weak(expected, expected + 1,
426  std::memory_order_acquire))
427  return;
428  atomic_backoff backoff;
429  do {
430  backoff();
431  expected = m_bits.load() & NOTWRITER;
432  } while (!m_bits.compare_exchange_weak(expected, expected + 1,
433  std::memory_order_acquire));
434  }
435 
436  /// Release the reader lock.
437  ///
438  void read_unlock() noexcept
439  {
440  // Atomically reduce the number of readers. It's at least 1,
441  // and the WRITER bit should definitely not be set, so this just
442  // boils down to an atomic decrement of m_bits.
443  m_bits.fetch_sub(1, std::memory_order_release);
444  }
445 
446  /// Acquire the writer lock.
447  ///
448  void write_lock() noexcept
449  {
450  // Do compare-and-exchange until we have just ourselves as writer
451  int expected = 0;
452  if (m_bits.compare_exchange_weak(expected, WRITER,
453  std::memory_order_acquire))
454  return;
455  atomic_backoff backoff;
456  do {
457  backoff();
458  expected = 0;
459  } while (!m_bits.compare_exchange_weak(expected, WRITER,
460  std::memory_order_acquire));
461  }
462 
463  /// Release the writer lock.
464  ///
465  void write_unlock() noexcept
466  {
467  // Remove the writer bit
468  m_bits.fetch_sub(WRITER, std::memory_order_release);
469  }
470 
471  /// lock() is a synonym for exclusive (write) lock.
472  void lock() { write_lock(); }
473 
474  /// unlock() is a synonym for exclusive (write) unlock.
475  void unlock() { write_unlock(); }
476 
477  /// Helper class: scoped read lock for a spin_rw_mutex -- grabs the
478  /// read lock upon construction, releases the lock when it exits scope.
480  public:
481  read_lock_guard (spin_rw_mutex &fm) noexcept : m_fm(fm) { m_fm.read_lock(); }
482  ~read_lock_guard () noexcept { m_fm.read_unlock(); }
483  private:
484  read_lock_guard(const read_lock_guard& other) = delete;
485  read_lock_guard& operator = (const read_lock_guard& other) = delete;
486  spin_rw_mutex & m_fm;
487  };
488 
489  /// Helper class: scoped write lock for a spin_rw_mutex -- grabs the
490  /// read lock upon construction, releases the lock when it exits scope.
492  public:
493  write_lock_guard (spin_rw_mutex &fm) noexcept : m_fm(fm) { m_fm.write_lock(); }
494  ~write_lock_guard () noexcept { m_fm.write_unlock(); }
495  private:
496  write_lock_guard(const write_lock_guard& other) = delete;
497  write_lock_guard& operator = (const write_lock_guard& other) = delete;
498  spin_rw_mutex & m_fm;
499  };
500 
501 private:
502  // Use one word to hold the reader count, with a high bit indicating
503  // that it's locked for writing. This will only work if we have
504  // fewer than 2^30 simultaneous readers. I think that should hold
505  // us for some time.
506  enum { WRITER = 1<<30, NOTWRITER = WRITER-1 };
507  std::atomic<int> m_bits { 0 };
508 };
509 
510 #endif
511 
512 
515 
516 
517 
518 /// Mutex pool. Sometimes, we have lots of objects that need to be
519 /// individually locked for thread safety, but two separate objects don't
520 /// need to lock against each other. If there are many more objects than
521 /// threads, it's wasteful for each object to contain its own mutex. So a
522 /// solution is to make a mutex_pool -- a collection of several mutexes.
523 /// Each object uses a hash to choose a consistent mutex for itself, but
524 /// which will be unlikely to be locked simultaneously by different object.
525 /// Semantically, it looks rather like an associative array of mutexes. We
526 /// also ensure that the mutexes are all on different cache lines, to ensure
527 /// that they don't exhibit false sharing. Try to choose Bins larger than
528 /// the expected number of threads that will be simultaneously locking
529 /// mutexes.
530 template<class Mutex, class Key, class Hash, size_t Bins = 16>
531 class mutex_pool {
532 public:
533  mutex_pool() noexcept {}
534  Mutex& operator[](const Key& key) noexcept { return m_mutex[m_hash(key) % Bins].m; }
535 
536 private:
537  // Helper type -- force cache line alignment. This should make an array
538  // of these also have padding so that each individual mutex is aligned
539  // to its own cache line, thus eliminating any "false sharing."
540  struct AlignedMutex {
542  };
543 
544  AlignedMutex m_mutex[Bins];
545  Hash m_hash;
546 };
547 
548 
549 
550 /// Simple thread group class: lets you spawn a group of new threads,
551 /// then wait for them to all complete.
553 public:
556 
558  {
559  if (t) {
560  lock_guard lock(m_mutex);
561  m_threads.emplace_back(t);
562  }
563  }
564 
565  template<typename FUNC, typename... Args>
566  thread* create_thread(FUNC func, Args&&... args)
567  {
568  thread* t = new thread(func, std::forward<Args>(args)...);
569  add_thread(t);
570  return t;
571  }
572 
573  void join_all()
574  {
575  lock_guard lock(m_mutex);
576  for (auto& t : m_threads)
577  if (t->joinable())
578  t->join();
579  }
580 
581  size_t size() const
582  {
583  lock_guard lock(m_mutex);
584  return m_threads.size();
585  }
586 
587 private:
588  mutable mutex m_mutex;
589  std::vector<std::unique_ptr<thread>> m_threads;
590 };
591 
592 
593 
594 /// thread_pool is a persistent set of threads watching a queue to which
595 /// tasks can be submitted.
596 ///
597 /// Call default_thread_pool() to retrieve a pointer to a single shared
598 /// thread_pool that will be initialized the first time it's needed, running
599 /// a number of threads corresponding to the number of cores on the machine.
600 ///
601 /// It's possible to create other pools, but it's not something that's
602 /// recommended unless you really know what you're doing and are careful
603 /// that the sum of threads across all pools doesn't cause you to be highly
604 /// over-threaded. An example of when this might be useful is if you want
605 /// one pool of 4 threads to handle I/O without interference from a separate
606 /// pool of 4 (other) threads handling computation.
607 ///
608 /// Submitting an asynchronous task to the queue follows the following
609 /// pattern:
610 ///
611 /// /* func that takes a thread ID followed possibly by more args */
612 /// result_t my_func (int thread_id, Arg1 arg1, ...) { }
613 /// pool->push (my_func, arg1, ...);
614 ///
615 /// If you just want to "fire and forget", then:
616 ///
617 /// pool->push (func, ...args...);
618 ///
619 /// But if you need a result, or simply need to know when the task has
620 /// completed, note that the push() method will return a future<result_t>
621 /// that you can check, like this:
622 ///
623 /// std::future<result_t> f = pool->push (my_task);
624 ///
625 /// And then you can
626 ///
627 /// find out if it's done: if (f.valid()) ...
628 /// wait for it to get done: f.wait();
629 /// get result (waiting if necessary): result_t r = f.get();
630 ///
631 /// A common idiom is to fire a bunch of sub-tasks at the queue, and then
632 /// wait for them to all complete. We provide a helper class, task_set,
633 /// to make this easy:
634 ///
635 /// task_set tasks (pool);
636 /// for (int i = 0; i < n_subtasks; ++i)
637 /// tasks.push (pool->push (myfunc));
638 /// tasks.wait ();
639 ///
640 /// Note that the tasks.wait() is optional -- it will be called
641 /// automatically when the task_set exits its scope.
642 ///
643 /// The task function's first argument, the thread_id, is the thread number
644 /// for the pool, or -1 if it's being executed by a non-pool thread (this
645 /// can happen in cases where the whole pool is occupied and the calling
646 /// thread contributes to running the work load).
647 ///
648 /// Thread pool. Have fun, be safe.
649 ///
651 public:
652  /// Initialize the pool. This implicitly calls resize() to set the
653  /// number of worker threads, defaulting to a number of workers that is
654  /// one less than the number of hardware cores.
655  thread_pool(int nthreads = -1);
656  ~thread_pool();
657 
658  /// How many threads are in the pool?
659  int size() const;
660 
661  /// Sets the number of worker threads in the pool. If the pool size is
662  /// 0, any tasks added to the pool will be executed immediately by the
663  /// calling thread. Requesting nthreads < 0 will cause it to resize to
664  /// the number of hardware cores minus one (one less, to account for the
665  /// fact that the calling thread will also contribute). BEWARE! Resizing
666  /// the queue should not be done while jobs are running.
667  void resize(int nthreads = -1);
668 
669  /// Return the number of currently idle threads in the queue. Zero
670  /// means the queue is fully engaged.
671  int idle() const;
672 
673  /// Run the user's function that accepts argument int - id of the
674  /// running thread. The returned value is templatized std::future, where
675  /// the user can get the result and rethrow any exceptions. If the queue
676  /// has no worker threads, the task will be run immediately by the
677  /// calling thread.
678  template<typename F> auto push(F&& f) -> std::future<decltype(f(0))>
679  {
680  auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(
681  std::forward<F>(f));
682  if (size() < 1) {
683  (*pck)(-1); // No worker threads, run it with the calling thread
684  } else {
685  auto _f = new std::function<void(int id)>(
686  [pck](int id) { (*pck)(id); });
687  push_queue_and_notify(_f);
688  }
689  return pck->get_future();
690  }
691 
692  /// Run the user's function that accepts an arbitrary set of arguments
693  /// (also passed). The returned value is templatized std::future, where
694  /// the user can get the result and rethrow any exceptions. If the queue
695  /// has no worker threads, the task will be run immediately by the
696  /// calling thread.
697  template<typename F, typename... Rest>
698  auto push (F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
699  auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
700  std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
701  );
702  if (size() < 1) {
703  (*pck)(-1); // No worker threads, run it with the calling thread
704  } else {
705  auto _f = new std::function<void(int id)>([pck](int id) {
706  (*pck)(id);
707  });
708  push_queue_and_notify (_f);
709  }
710  return pck->get_future();
711  }
712 
713  /// If there are any tasks on the queue, pull one off and run it (on
714  /// this calling thread) and return true. Otherwise (there are no
715  /// pending jobs), return false immediately. This utility is what makes
716  /// it possible for non-pool threads to also run tasks from the queue
717  /// when they would ordinarily be idle. The thread id of the caller
718  /// should be passed.
719  bool run_one_task(std::thread::id id);
720 
721  /// Return true if the calling thread is part of the thread pool. This
722  /// can be used to limit a pool thread from unadvisedly adding its own
723  /// subtasks to clog up the pool.
724  /// DEPRECATED(2.1) -- use is_worker() instead.
725  bool this_thread_is_in_pool() const;
726 
727  /// Register a thread (not already in the thread pool itself) as working
728  /// on tasks in the pool. This is used to avoid recursion.
729  void register_worker(std::thread::id id);
730  /// De-register a thread, saying it is no longer in the process of
731  /// taking work from the thread pool.
732  void deregister_worker(std::thread::id id);
733  /// Is the thread in the pool or currently engaged in taking tasks from
734  /// the pool?
735  bool is_worker(std::thread::id id) const;
736  bool is_worker() const { return is_worker(std::this_thread::get_id()); }
737  // Non-const versions: DEPRECATED(2.1)
738  bool is_worker(std::thread::id id);
739 
740  /// How many jobs are waiting to run? (Use with caution! Can be out of
741  /// date by the time you look at it.)
742  size_t jobs_in_queue() const;
743 
744  /// Is the pool very busy? Meaning that there are significantly more
745  /// tasks in the queue waiting to run than there are threads in the
746  /// pool. It may be wise for a caller to check this before submitting
747  /// tasks -- if the queue is very busy, it's probably more expedient to
748  /// execute the code directly rather than add it to an oversubscribed
749  /// queue.
750  bool very_busy() const;
751 
752 private:
753  // Disallow copy construction and assignment
754  thread_pool(const thread_pool&) = delete;
755  thread_pool(thread_pool&&) = delete;
756  thread_pool& operator=(const thread_pool&) = delete;
757  thread_pool& operator=(thread_pool&&) = delete;
758 
759  // PIMPL pattern hides all the guts far away from the public API
760  class Impl;
761  std::unique_ptr<Impl> m_impl;
762 
763  // Utility function that helps us hide the implementation
764  void push_queue_and_notify(std::function<void(int id)>* f);
765 };
766 
767 
768 
769 /// Return a reference to the "default" shared thread pool. In almost all
770 /// ordinary circumstances, you should use this exclusively to get a
771 /// single shared thread pool, since creating multiple thread pools
772 /// could result in hilariously over-threading your application.
774 
775 
776 
777 /// task_set is a group of future<void>'s from a thread_queue that you can
778 /// add to, and when you either call wait() or just leave the task_set's
779 /// scope, it will wait for all the tasks in the set to be done before
780 /// proceeding.
781 ///
782 /// A typical idiom for using this is:
783 ///
784 /// void myfunc (int id) { ... do something ... }
785 ///
786 /// thread_pool* pool (default_thread_pool());
787 /// {
788 /// task_set tasks (pool);
789 /// // Launch a bunch of tasks into the thread pool
790 /// for (int i = 0; i < ntasks; ++i)
791 /// tasks.push (pool->push (myfunc));
792 /// // The following brace, by ending the scope of 'tasks', will
793 /// // wait for all those queue tasks to finish.
794 /// }
795 ///
797 public:
799  : m_pool(pool ? pool : default_thread_pool())
800  , m_submitter_thread(std::this_thread::get_id())
801  {
802  }
803  ~task_set() { wait(); }
804 
805  task_set(const task_set&) = delete;
806  const task_set& operator=(const task_set&) = delete;
807 
808  // Return the thread id of the thread that set up this task_set and
809  // submitted its tasks to the thread pool.
810  std::thread::id submitter() const { return m_submitter_thread; }
811 
812  // Save a future (presumably returned by a threadpool::push() as part
813  // of this task set.
814  void push(std::future<void>&& f)
815  {
816  OIIO_DASSERT(
817  std::this_thread::get_id() == submitter()
818  && "All tasks in a tast_set should be added by the same thread");
819  m_futures.emplace_back(std::move(f));
820  }
821 
822  // Wait for the given taskindex (0..n-1, where n is the number of tasks
823  // submitted as part of this task_set). If block == true, fully block
824  // while waiting for that task to finish. If block is false, then busy
825  // wait, and opportunistically run queue tasks yourself while you are
826  // waiting for the task to finish.
827  void wait_for_task(size_t taskindex, bool block = false);
828 
829  // Wait for all tasks in the set to finish. If block == true, fully
830  // block while waiting for the pool threads to all finish. If block is
831  // false, then busy wait, and opportunistically run queue tasks yourself
832  // while you are waiting for other tasks to finish.
833  void wait(bool block = false);
834 
835  // Debugging sanity check, called after wait(), to ensure that all the
836  // tasks were completed.
837  void check_done()
838  {
839  const std::chrono::milliseconds wait_time(0);
840  for (auto&& f : m_futures)
841  OIIO_ASSERT(f.wait_for(wait_time) == std::future_status::ready);
842  }
843 
844 private:
845  thread_pool* m_pool;
846  std::thread::id m_submitter_thread;
847  std::vector<std::future<void>> m_futures;
848 };
849 
850 
void lock() noexcept
Definition: thread.h:66
null_mutex() noexcept
Definition: thread.h:64
void add_thread(thread *t)
Definition: thread.h:557
#define OIIO_ASSERT(x)
Definition: dassert.h:32
std::lock_guard< mutex > lock_guard
Definition: thread.h:85
STATIC_INLINE size_t Hash(const char *s, size_t len)
Definition: farmhash.h:2038
null_lock(T &) noexcept
Definition: thread.h:77
spin_rw_mutex::write_lock_guard spin_rw_write_lock
Definition: thread.h:514
std::mutex Mutex
void unlock() noexcept
Definition: thread.h:233
Mutex & operator[](const Key &key) noexcept
Definition: thread.h:534
spin_mutex::lock_guard spin_lock
Definition: thread.h:269
atomic_backoff(int pausemax=16) noexcept
Definition: thread.h:139
void lock() noexcept
Definition: thread.h:198
GLdouble GLdouble t
Definition: glew.h:1403
thread_group()
Definition: thread.h:554
~spin_mutex(void) noexcept
Definition: thread.h:186
void join_all()
Definition: thread.h:573
GLsizeiptr size
Definition: glcorearb.h:663
void unlock_shared() noexcept
Definition: thread.h:69
bool try_lock() noexcept
Definition: thread.h:70
atomic< int > atomic_int
Definition: atomic.h:25
void unlock()
unlock() is a synonym for exclusive (write) unlock.
Definition: thread.h:475
~null_mutex() noexcept
Definition: thread.h:65
lock_guard(spin_mutex &fm) noexcept
Definition: thread.h:250
write_lock_guard(spin_rw_mutex &fm) noexcept
Definition: thread.h:493
bool try_lock() noexcept
Definition: thread.h:241
void write_unlock() noexcept
Definition: thread.h:465
#define OIIO_DASSERT
Definition: dassert.h:55
bool is_worker() const
Definition: thread.h:736
OIIO_API thread_pool * default_thread_pool()
void read_unlock() noexcept
Definition: thread.h:438
~spin_rw_mutex() noexcept
Definition: thread.h:403
spin_mutex(const spin_mutex &) noexcept
Definition: thread.h:190
GLuint id
Definition: glcorearb.h:654
typedef int(WINAPI *PFNWGLRELEASEPBUFFERDCARBPROC)(HPBUFFERARB hPbuffer
void operator()() noexcept
Definition: thread.h:145
~task_set()
Definition: thread.h:803
task_set(thread_pool *pool=nullptr)
Definition: thread.h:798
void write_lock() noexcept
Definition: thread.h:448
Wrappers and utilities for atomics.
mutex_pool() noexcept
Definition: thread.h:533
void lock_shared() noexcept
Definition: thread.h:68
size_t size() const
Definition: thread.h:581
*tasks wait()
~lock_guard() noexcept
Definition: thread.h:255
void unlock() noexcept
Definition: thread.h:67
**Note that the tasks the is the thread number *for the or if it s being executed by a non pool thread(this *can happen in cases where the whole pool is occupied and the calling *thread contributes to running the work load).**Thread pool.Have fun
GLenum func
Definition: glcorearb.h:782
spin_rw_mutex() noexcept
Definition: thread.h:401
#define OIIO_CACHE_ALIGN
Definition: platform.h:324
ImageBuf OIIO_API resize(const ImageBuf &src, string_view filtername="", float filterwidth=0.0f, ROI roi={}, int nthreads=0)
#define OIIO_UNLIKELY(x)
Definition: platform.h:343
void push(std::future< void > &&f)
Definition: thread.h:814
void lock()
lock() is a synonym for exclusive (write) lock.
Definition: thread.h:472
auto push(F &&f, Rest &&...rest) -> std::future< decltype(f(0, rest...))>
Definition: thread.h:698
const spin_rw_mutex & operator=(const spin_rw_mutex &)=delete
~thread_group()
Definition: thread.h:555
std::thread::id submitter() const
Definition: thread.h:810
void yield() noexcept
Definition: thread.h:93
std::lock_guard< recursive_mutex > recursive_lock_guard
Definition: thread.h:86
spin_mutex(void) noexcept
Definition: thread.h:185
const GLdouble * m
Definition: glew.h:9166
spin_rw_mutex::read_lock_guard spin_rw_read_lock
Definition: thread.h:513
GLfloat f
Definition: glcorearb.h:1925
void pause(int delay) noexcept
Definition: thread.h:108
void read_lock() noexcept
Definition: thread.h:411
**If you just want to fire and args
Definition: thread.h:615
read_lock_guard(spin_rw_mutex &fm) noexcept
Definition: thread.h:481
auto push(F &&f) -> std::future< decltype(f(0))>
Definition: thread.h:678
void check_done()
Definition: thread.h:837
#define OIIO_NAMESPACE_END
Definition: oiioversion.h:94
const spin_mutex & operator=(const spin_mutex &) noexcept
Definition: thread.h:194
GA_API const UT_StringHolder rest
thread * create_thread(FUNC func, Args &&...args)
Definition: thread.h:566
**Note that the tasks the is the thread number *for the pool
Definition: thread.h:643
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the and then *wait for them to all complete We provide a helper task_set
Definition: thread.h:629
#define OIIO_CACHE_LINE_SIZE
Definition: platform.h:321
#define OIIO_NAMESPACE_BEGIN
Definition: oiioversion.h:93
#define OIIO_API
Definition: export.h:65