HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
thread.h
Go to the documentation of this file.
1 /*
2  Copyright 2008 Larry Gritz and the other authors and contributors.
3  All Rights Reserved.
4 
5  Redistribution and use in source and binary forms, with or without
6  modification, are permitted provided that the following conditions are
7  met:
8  * Redistributions of source code must retain the above copyright
9  notice, this list of conditions and the following disclaimer.
10  * Redistributions in binary form must reproduce the above copyright
11  notice, this list of conditions and the following disclaimer in the
12  documentation and/or other materials provided with the distribution.
13  * Neither the name of the software's owners nor the names of its
14  contributors may be used to endorse or promote products derived from
15  this software without specific prior written permission.
16  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 
28  (This is the Modified BSD License)
29 */
30 
31 // clang-format off
32 
33 /////////////////////////////////////////////////////////////////////////
34 /// @file thread.h
35 ///
36 /// @brief Wrappers and utilities for multithreading.
37 /////////////////////////////////////////////////////////////////////////
38 
39 
40 #pragma once
41 
42 #include <algorithm>
43 #include <atomic>
44 #include <chrono>
45 #include <functional>
46 #include <future>
47 #include <iostream>
48 #include <memory>
49 #include <mutex>
50 #include <thread>
51 #include <vector>
52 
53 #include <OpenImageIO/atomic.h>
54 #include <OpenImageIO/dassert.h>
55 #include <OpenImageIO/export.h>
57 #include <OpenImageIO/platform.h>
58 
59 
60 
61 // OIIO_THREAD_ALLOW_DCLP, if set to 0, prevents us from using a dodgy
62 // "double checked lock pattern" (DCLP). We are very careful to construct
63 // it safely and correctly, and these uses improve thread performance for
64 // us. But it confuses Thread Sanitizer, so this switch allows you to turn
65 // it off. Also set to 0 if you don't believe that we are correct in
66 // allowing this construct on all platforms.
67 #ifndef OIIO_THREAD_ALLOW_DCLP
68 # define OIIO_THREAD_ALLOW_DCLP 1
69 #endif
70 
71 
72 
73 // Some helpful links:
74 //
75 // Descriptions of the "new" gcc atomic intrinsics:
76 // https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html
77 // Old gcc atomic intrinsics:
78 // https://gcc.gnu.org/onlinedocs/gcc-4.4.2/gcc/Atomic-Builtins.html
79 // C++11 and beyond std::atomic:
80 // http://en.cppreference.com/w/cpp/atomic
81 
82 
83 
85 
86 /// Null mutex that can be substituted for a real one to test how much
87 /// overhead is associated with a particular mutex.
88 class null_mutex {
89 public:
92  void lock() {}
93  void unlock() {}
94  void lock_shared() {}
95  void unlock_shared() {}
96  bool try_lock() { return true; }
97 };
98 
99 /// Null lock that can be substituted for a real one to test how much
100 /// overhead is associated with a particular lock.
101 template<typename T> class null_lock {
102 public:
103  null_lock(T& m) {}
104 };
105 
106 
107 
108 using std::mutex;
109 using std::recursive_mutex;
110 using std::thread;
111 typedef std::lock_guard<mutex> lock_guard;
112 typedef std::lock_guard<recursive_mutex> recursive_lock_guard;
113 
114 
115 
116 /// Yield the processor for the rest of the timeslice.
117 ///
118 inline void
120 {
121 #if defined(__GNUC__)
122  sched_yield();
123 #elif defined(_MSC_VER)
124  SwitchToThread();
125 #else
126 # error No yield on this platform.
127 #endif
128 }
129 
130 
131 
132 // Slight pause
133 inline void
134 pause(int delay)
135 {
136 #if defined(__GNUC__) && (defined(__x86_64__) || defined(__i386__))
137  for (int i = 0; i < delay; ++i)
138  __asm__ __volatile__("pause;");
139 
140 #elif defined(__GNUC__) && (defined(__arm__) || defined(__s390__))
141  for (int i = 0; i < delay; ++i)
142  __asm__ __volatile__("NOP;");
143 
144 #elif defined(_MSC_VER)
145  for (int i = 0; i < delay; ++i) {
146 # if defined(_WIN64)
147  YieldProcessor();
148 # else
149  _asm pause
150 # endif /* _WIN64 */
151  }
152 
153 #else
154  // No pause on this platform, just punt
155  for (int i = 0; i < delay; ++i)
156  ;
157 #endif
158 }
159 
160 
161 
162 // Helper class to deliver ever longer pauses until we yield our timeslice.
164 public:
165  atomic_backoff(int pausemax = 16)
166  : m_count(1)
167  , m_pausemax(pausemax)
168  {
169  }
170 
171  void operator()()
172  {
173  if (m_count <= m_pausemax) {
174  pause(m_count);
175  m_count *= 2;
176  } else {
177  yield();
178  }
179  }
180 
181 private:
182  int m_count;
183  int m_pausemax;
184 };
185 
186 
187 
188 
189 /// A spin_mutex is semantically equivalent to a regular mutex, except
190 /// for the following:
191 /// - A spin_mutex is just 1 byte, whereas a regular mutex is quite
192 /// large (44 bytes for pthread).
193 /// - A spin_mutex is extremely fast to lock and unlock, whereas a regular
194 /// mutex is surprisingly expensive just to acquire a lock.
195 /// - A spin_mutex takes CPU while it waits, so this can be very
196 /// wasteful compared to a regular mutex that blocks (gives up its
197 /// CPU slices until it acquires the lock).
198 ///
199 /// The bottom line is that mutex is the usual choice, but in cases where
200 /// you need to acquire locks very frequently, but only need to hold the
201 /// lock for a very short period of time, you may save runtime by using
202 /// a spin_mutex, even though it's non-blocking.
203 ///
204 /// N.B. A spin_mutex is only the size of a bool. To avoid "false
205 /// sharing", be careful not to put two spin_mutex objects on the same
206 /// cache line (within 128 bytes of each other), or the two mutexes may
207 /// effectively (and wastefully) lock against each other.
208 ///
209 class spin_mutex {
210 public:
211  spin_mutex(void) {}
212  ~spin_mutex(void) {}
213 
214  /// Copy constructor -- initialize to unlocked.
215  ///
217 
218  /// Assignment does not do anything, since lockedness should not
219  /// transfer.
220  const spin_mutex& operator=(const spin_mutex&) { return *this; }
221 
222  /// Acquire the lock, spin until we have it.
223  ///
224  void lock()
225  {
226  // To avoid spinning too tightly, we use the atomic_backoff to
227  // provide increasingly longer pauses, and if the lock is under
228  // lots of contention, eventually yield the timeslice.
229  atomic_backoff backoff;
230 
231  // Try to get ownership of the lock. Though experimentation, we
232  // found that OIIO_UNLIKELY makes this just a bit faster on gcc
233  // x86/x86_64 systems.
234  while (!OIIO_UNLIKELY(try_lock())) {
235 #if OIIO_THREAD_ALLOW_DCLP
236  // The full try_lock() involves a test_and_set, which
237  // writes memory, and that will lock the bus. But a normal
238  // read of m_locked will let us spin until the value
239  // changes, without locking the bus. So it's faster to
240  // check in this manner until the mutex appears to be free.
241  // HOWEVER... Thread Sanitizer things this is an instance of
242  // an unsafe "double checked lock pattern" (DCLP) and flags it
243  // as an error. I think it's a false negative, because the
244  // outer loop is still an atomic check, the inner non-atomic
245  // loop only serves to delay, and can't lead to a true data
246  // race. But we provide this build-time switch to, at least,
247  // give a way to use tsan for other checks.
248  do {
249  backoff();
250  } while (*(volatile bool*)&m_locked);
251 #else
252  backoff();
253 #endif
254  }
255  }
256 
257  /// Release the lock that we hold.
258  ///
259  void unlock()
260  {
261  // Fastest way to do it is with a clear with "release" semantics
262  m_locked.clear(std::memory_order_release);
263  }
264 
265  /// Try to acquire the lock. Return true if we have it, false if
266  /// somebody else is holding the lock.
267  bool try_lock()
268  {
269  return !m_locked.test_and_set(std::memory_order_acquire);
270  }
271 
272  /// Helper class: scoped lock for a spin_mutex -- grabs the lock upon
273  /// construction, releases the lock when it exits scope.
274  class lock_guard {
275  public:
277  : m_fm(fm)
278  {
279  m_fm.lock();
280  }
281  ~lock_guard() { m_fm.unlock(); }
282 
283  private:
284  lock_guard() = delete;
285  lock_guard(const lock_guard& other) = delete;
286  lock_guard& operator=(const lock_guard& other) = delete;
287  spin_mutex& m_fm;
288  };
289 
290 private:
291  std::atomic_flag m_locked = ATOMIC_FLAG_INIT; // initialize to unlocked
292 };
293 
294 
296 
297 
298 
299 #if 0
300 
301 // OLD CODE vvvvvvvv
302 
303 
304 /// Spinning reader/writer mutex. This is just like spin_mutex, except
305 /// that there are separate locking mechanisms for "writers" (exclusive
306 /// holders of the lock, presumably because they are modifying whatever
307 /// the lock is protecting) and "readers" (non-exclusive, non-modifying
308 /// tasks that may access the protectee simultaneously).
309 class spin_rw_mutex {
310 public:
311  /// Default constructor -- initialize to unlocked.
312  ///
313  spin_rw_mutex (void) { m_readers = 0; }
314 
315  ~spin_rw_mutex (void) { }
316 
317  /// Copy constructor -- initialize to unlocked.
318  ///
319  spin_rw_mutex (const spin_rw_mutex &) { m_readers = 0; }
320 
321  /// Assignment does not do anything, since lockedness should not
322  /// transfer.
323  const spin_rw_mutex& operator= (const spin_rw_mutex&) { return *this; }
324 
325  /// Acquire the reader lock.
326  ///
327  void read_lock () {
328  // Spin until there are no writers active
329  m_locked.lock();
330  // Register ourself as a reader
331  ++m_readers;
332  // Release the lock, to let other readers work
333  m_locked.unlock();
334  }
335 
336  /// Release the reader lock.
337  ///
338  void read_unlock () {
339  --m_readers; // it's atomic, no need to lock to release
340  }
341 
342  /// Acquire the writer lock.
343  ///
344  void write_lock () {
345  // Make sure no new readers (or writers) can start
346  m_locked.lock();
347  // Spin until the last reader is done, at which point we will be
348  // the sole owners and nobody else (reader or writer) can acquire
349  // the resource until we release it.
350 #if OIIO_THREAD_ALLOW_DCLP
351  while (*(volatile int *)&m_readers > 0)
352  ;
353 #else
354  while (m_readers > 0)
355  ;
356 #endif
357  }
358 
359  /// Release the writer lock.
360  ///
361  void write_unlock () {
362  // Let other readers or writers get the lock
363  m_locked.unlock ();
364  }
365 
366  /// Acquire an exclusive ("writer") lock.
367  void lock () { write_lock(); }
368 
369  /// Release an exclusive ("writer") lock.
370  void unlock () { write_unlock(); }
371 
372  /// Acquire a shared ("reader") lock.
373  void lock_shared () { read_lock(); }
374 
375  /// Release a shared ("reader") lock.
376  void unlock_shared () { read_unlock(); }
377 
378  /// Helper class: scoped read lock for a spin_rw_mutex -- grabs the
379  /// read lock upon construction, releases the lock when it exits scope.
380  class read_lock_guard {
381  public:
382  read_lock_guard (spin_rw_mutex &fm) : m_fm(fm) { m_fm.read_lock(); }
383  ~read_lock_guard () { m_fm.read_unlock(); }
384  private:
385  read_lock_guard(); // Do not implement
386  read_lock_guard(const read_lock_guard& other); // Do not implement
387  read_lock_guard& operator = (const read_lock_guard& other); // Do not implement
388  spin_rw_mutex & m_fm;
389  };
390 
391  /// Helper class: scoped write lock for a spin_rw_mutex -- grabs the
392  /// read lock upon construction, releases the lock when it exits scope.
393  class write_lock_guard {
394  public:
395  write_lock_guard (spin_rw_mutex &fm) : m_fm(fm) { m_fm.write_lock(); }
396  ~write_lock_guard () { m_fm.write_unlock(); }
397  private:
398  write_lock_guard(); // Do not implement
399  write_lock_guard(const write_lock_guard& other); // Do not implement
400  write_lock_guard& operator = (const write_lock_guard& other); // Do not implement
401  spin_rw_mutex & m_fm;
402  };
403 
404 private:
406  spin_mutex m_locked; // write lock
407  char pad1_[OIIO_CACHE_LINE_SIZE-sizeof(spin_mutex)];
409  atomic_int m_readers; // number of readers
410  char pad2_[OIIO_CACHE_LINE_SIZE-sizeof(atomic_int)];
411 };
412 
413 
414 #else
415 
416 // vvv New spin rw lock Oct 2017
417 
418 /// Spinning reader/writer mutex. This is just like spin_mutex, except
419 /// that there are separate locking mechanisms for "writers" (exclusive
420 /// holders of the lock, presumably because they are modifying whatever
421 /// the lock is protecting) and "readers" (non-exclusive, non-modifying
422 /// tasks that may access the protectee simultaneously).
424 public:
425  /// Default constructor -- initialize to unlocked.
426  ///
428 
430 
431  // Do not allow copy or assignment.
432  spin_rw_mutex(const spin_rw_mutex&) = delete;
433  const spin_rw_mutex& operator=(const spin_rw_mutex&) = delete;
434 
435  /// Acquire the reader lock.
436  ///
437  void read_lock()
438  {
439  // first increase the readers, and if it turned out nobody was
440  // writing, we're done. This means that acquiring a read when nobody
441  // is writing is a single atomic operation.
442  int oldval = m_bits.fetch_add(1, std::memory_order_acquire);
443  if (!(oldval & WRITER))
444  return;
445  // Oops, we incremented readers but somebody was writing. Backtrack
446  // by subtracting, and do things the hard way.
447  int expected = (--m_bits) & NOTWRITER;
448 
449  // Do compare-and-exchange until we can increase the number of
450  // readers by one and have no writers.
451  if (m_bits.compare_exchange_weak(expected, expected + 1,
452  std::memory_order_acquire))
453  return;
454  atomic_backoff backoff;
455  do {
456  backoff();
457  expected = m_bits.load() & NOTWRITER;
458  } while (!m_bits.compare_exchange_weak(expected, expected + 1,
459  std::memory_order_acquire));
460  }
461 
462  /// Release the reader lock.
463  ///
464  void read_unlock()
465  {
466  // Atomically reduce the number of readers. It's at least 1,
467  // and the WRITER bit should definitely not be set, so this just
468  // boils down to an atomic decrement of m_bits.
469  m_bits.fetch_sub(1, std::memory_order_release);
470  }
471 
472  /// Acquire the writer lock.
473  ///
474  void write_lock()
475  {
476  // Do compare-and-exchange until we have just ourselves as writer
477  int expected = 0;
478  if (m_bits.compare_exchange_weak(expected, WRITER,
479  std::memory_order_acquire))
480  return;
481  atomic_backoff backoff;
482  do {
483  backoff();
484  expected = 0;
485  } while (!m_bits.compare_exchange_weak(expected, WRITER,
486  std::memory_order_acquire));
487  }
488 
489  /// Release the writer lock.
490  ///
492  {
493  // Remove the writer bit
494  m_bits.fetch_sub(WRITER, std::memory_order_release);
495  }
496 
497  /// Helper class: scoped read lock for a spin_rw_mutex -- grabs the
498  /// read lock upon construction, releases the lock when it exits scope.
500  public:
501  read_lock_guard (spin_rw_mutex &fm) : m_fm(fm) { m_fm.read_lock(); }
503  private:
504  read_lock_guard(const read_lock_guard& other) = delete;
505  read_lock_guard& operator = (const read_lock_guard& other) = delete;
506  spin_rw_mutex & m_fm;
507  };
508 
509  /// Helper class: scoped write lock for a spin_rw_mutex -- grabs the
510  /// read lock upon construction, releases the lock when it exits scope.
512  public:
513  write_lock_guard (spin_rw_mutex &fm) : m_fm(fm) { m_fm.write_lock(); }
515  private:
516  write_lock_guard(const write_lock_guard& other) = delete;
517  write_lock_guard& operator = (const write_lock_guard& other) = delete;
518  spin_rw_mutex & m_fm;
519  };
520 
521 private:
522  // Use one word to hold the reader count, with a high bit indicating
523  // that it's locked for writing. This will only work if we have
524  // fewer than 2^30 simultaneous readers. I think that should hold
525  // us for some time.
526  enum { WRITER = 1<<30, NOTWRITER = WRITER-1 };
527  std::atomic<int> m_bits { 0 };
528 };
529 
530 #endif
531 
532 
535 
536 
537 
538 /// Mutex pool. Sometimes, we have lots of objects that need to be
539 /// individually locked for thread safety, but two separate objects don't
540 /// need to lock against each other. If there are many more objects than
541 /// threads, it's wasteful for each object to contain its own mutex. So a
542 /// solution is to make a mutex_pool -- a collection of several mutexes.
543 /// Each object uses a hash to choose a consistent mutex for itself, but
544 /// which will be unlikely to be locked simultaneously by different object.
545 /// Semantically, it looks rather like an associative array of mutexes. We
546 /// also ensure that the mutexes are all on different cache lines, to ensure
547 /// that they don't exhibit false sharing. Try to choose Bins larger than
548 /// the expected number of threads that will be simultaneously locking
549 /// mutexes.
550 template<class Mutex, class Key, class Hash, size_t Bins = 16>
551 class mutex_pool {
552 public:
554  Mutex& operator[](const Key& key) { return m_mutex[m_hash(key) % Bins].m; }
555 
556 private:
557  // Helper type -- force cache line alignment. This should make an array
558  // of these also have padding so that each individual mutex is aligned
559  // to its own cache line, thus eliminating any "false sharing."
560  struct AlignedMutex {
562  };
563 
564  AlignedMutex m_mutex[Bins];
565  Hash m_hash;
566 };
567 
568 
569 
570 /// Simple thread group class: lets you spawn a group of new threads,
571 /// then wait for them to all complete.
573 public:
576 
578  {
579  if (t) {
580  lock_guard lock(m_mutex);
581  m_threads.emplace_back(t);
582  }
583  }
584 
585  template<typename FUNC, typename... Args>
586  thread* create_thread(FUNC func, Args&&... args)
587  {
588  thread* t = new thread(func, std::forward<Args>(args)...);
589  add_thread(t);
590  return t;
591  }
592 
593  void join_all()
594  {
595  lock_guard lock(m_mutex);
596  for (auto& t : m_threads)
597  if (t->joinable())
598  t->join();
599  }
600 
601  size_t size() const
602  {
603  lock_guard lock(m_mutex);
604  return m_threads.size();
605  }
606 
607 private:
608  mutable mutex m_mutex;
609  std::vector<std::unique_ptr<thread>> m_threads;
610 };
611 
612 
613 
614 /// thread_pool is a persistent set of threads watching a queue to which
615 /// tasks can be submitted.
616 ///
617 /// Call default_thread_pool() to retrieve a pointer to a single shared
618 /// thread_pool that will be initialized the first time it's needed, running
619 /// a number of threads corresponding to the number of cores on the machine.
620 ///
621 /// It's possible to create other pools, but it's not something that's
622 /// recommended unless you really know what you're doing and are careful
623 /// that the sum of threads across all pools doesn't cause you to be highly
624 /// over-threaded. An example of when this might be useful is if you want
625 /// one pool of 4 threads to handle I/O without interference from a separate
626 /// pool of 4 (other) threads handling computation.
627 ///
628 /// Submitting an asynchronous task to the queue follows the following
629 /// pattern:
630 /// /* func that takes a thread ID followed possibly by more args */
631 /// result_t my_func (int thread_id, Arg1 arg1, ...) { }
632 /// pool->push (my_func, arg1, ...);
633 ///
634 /// If you just want to "fire and forget", then:
635 /// pool->push (func, ...args...);
636 /// But if you need a result, or simply need to know when the task has
637 /// completed, note that the push() method will return a future<result_t>
638 /// that you can check, like this:
639 /// std::future<result_t> f = pool->push (my_task);
640 /// And then you can
641 /// find out if it's done: if (f.valid()) ...
642 /// wait for it to get done: f.wait();
643 /// get result (waiting if necessary): result_t r = f.get();
644 ///
645 /// A common idiom is to fire a bunch of sub-tasks at the queue, and then
646 /// wait for them to all complete. We provide a helper class, task_set,
647 /// to make this easy:
648 /// task_set tasks (pool);
649 /// for (int i = 0; i < n_subtasks; ++i)
650 /// tasks.push (pool->push (myfunc));
651 /// tasks.wait ();
652 /// Note that the tasks.wait() is optional -- it will be called
653 /// automatically when the task_set exits its scope.
654 ///
655 /// The task function's first argument, the thread_id, is the thread number
656 /// for the pool, or -1 if it's being executed by a non-pool thread (this
657 /// can happen in cases where the whole pool is occupied and the calling
658 /// thread contributes to running the work load).
659 ///
660 /// Thread pool. Have fun, be safe.
661 ///
663 public:
664  /// Initialize the pool. This implicitly calls resize() to set the
665  /// number of worker threads, defaulting to a number of workers that is
666  /// one less than the number of hardware cores.
667  thread_pool(int nthreads = -1);
668  ~thread_pool();
669 
670  /// How many threads are in the pool?
671  int size() const;
672 
673  /// Sets the number of worker threads in the pool. If the pool size is
674  /// 0, any tasks added to the pool will be executed immediately by the
675  /// calling thread. Requesting nthreads < 0 will cause it to resize to
676  /// the number of hardware cores minus one (one less, to account for the
677  /// fact that the calling thread will also contribute). BEWARE! Resizing
678  /// the queue should not be done while jobs are running.
679  void resize(int nthreads = -1);
680 
681  /// Return the number of currently idle threads in the queue. Zero
682  /// means the queue is fully engaged.
683  int idle() const;
684 
685  /// Run the user's function that accepts argument int - id of the
686  /// running thread. The returned value is templatized std::future, where
687  /// the user can get the result and rethrow any exceptions. If the queue
688  /// has no worker threads, the task will be run immediately by the
689  /// calling thread.
690  template<typename F> auto push(F&& f) -> std::future<decltype(f(0))>
691  {
692  auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(
693  std::forward<F>(f));
694  if (size() < 1) {
695  (*pck)(-1); // No worker threads, run it with the calling thread
696  } else {
697  auto _f = new std::function<void(int id)>(
698  [pck](int id) { (*pck)(id); });
699  push_queue_and_notify(_f);
700  }
701  return pck->get_future();
702  }
703 
704  /// Run the user's function that accepts an arbitrary set of arguments
705  /// (also passed). The returned value is templatized std::future, where
706  /// the user can get the result and rethrow any exceptions. If the queue
707  /// has no worker threads, the task will be run immediately by the
708  /// calling thread.
709  template<typename F, typename... Rest>
710  auto push (F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
711  auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
712  std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
713  );
714  if (size() < 1) {
715  (*pck)(-1); // No worker threads, run it with the calling thread
716  } else {
717  auto _f = new std::function<void(int id)>([pck](int id) {
718  (*pck)(id);
719  });
720  push_queue_and_notify (_f);
721  }
722  return pck->get_future();
723  }
724 
725  /// If there are any tasks on the queue, pull one off and run it (on
726  /// this calling thread) and return true. Otherwise (there are no
727  /// pending jobs), return false immediately. This utility is what makes
728  /// it possible for non-pool threads to also run tasks from the queue
729  /// when they would ordinarily be idle. The thread id of the caller
730  /// should be passed.
731  bool run_one_task(std::thread::id id);
732 
733  /// Return true if the calling thread is part of the thread pool. This
734  /// can be used to limit a pool thread from inadvisedly adding its own
735  /// subtasks to clog up the pool.
736  bool this_thread_is_in_pool() const;
737 
738  /// Register a thread (not already in the thread pool itself) as working
739  /// on tasks in the pool. This is used to avoid recursion.
740  void register_worker(std::thread::id id);
741  /// De-register a thread, saying it is no longer in the process of
742  /// taking work from the thread pool.
743  void deregister_worker(std::thread::id id);
744  /// Is the thread in the pool or currently engaged in taking tasks from
745  /// the pool?
746  bool is_worker(std::thread::id id);
747  bool is_worker() { return is_worker(std::this_thread::get_id()); }
748 
749  /// How many jobs are waiting to run? (Use with caution! Can be out of
750  /// date by the time you look at it.)
751  size_t jobs_in_queue() const;
752 
753  /// Is the pool very busy? Meaning that there are significantly more
754  /// tasks in the queue waiting to run than there are threads in the
755  /// pool. It may be wise for a caller to check this before submitting
756  /// tasks -- if the queue is very busy, it's probably more expedient to
757  /// execute the code directly rather than add it to an oversubscribed
758  /// queue.
759  bool very_busy() const;
760 
761 private:
762  // Disallow copy construction and assignment
763  thread_pool(const thread_pool&) = delete;
764  thread_pool(thread_pool&&) = delete;
765  thread_pool& operator=(const thread_pool&) = delete;
766  thread_pool& operator=(thread_pool&&) = delete;
767 
768  // PIMPL pattern hides all the guts far away from the public API
769  class Impl;
770  std::unique_ptr<Impl> m_impl;
771 
772  // Utility function that helps us hide the implementation
773  void push_queue_and_notify(std::function<void(int id)>* f);
774 };
775 
776 
777 
778 /// Return a reference to the "default" shared thread pool. In almost all
779 /// ordinary circumstances, you should use this exclusively to get a
780 /// single shared thread pool, since creating multiple thread pools
781 /// could result in hilariously over-threading your application.
783 
784 
785 
786 /// task_set is a group of future<void>'s from a thread_queue that you can
787 /// add to, and when you either call wait() or just leave the task_set's
788 /// scope, it will wait for all the tasks in the set to be done before
789 /// proceeding.
790 ///
791 /// A typical idiom for using this is:
792 ///
793 /// void myfunc (int id) { ... do something ... }
794 ///
795 /// thread_pool* pool (default_thread_pool());
796 /// {
797 /// task_set tasks (pool);
798 /// // Launch a bunch of tasks into the thread pool
799 /// for (int i = 0; i < ntasks; ++i)
800 /// tasks.push (pool->push (myfunc));
801 /// // The following brace, by ending the scope of 'tasks', will
802 /// // wait for all those queue tasks to finish.
803 /// }
804 ///
806 public:
808  : m_pool(pool ? pool : default_thread_pool())
809  , m_submitter_thread(std::this_thread::get_id())
810  {
811  }
812  ~task_set() { wait(); }
813 
814  task_set(const task_set&) = delete;
815  const task_set& operator=(const task_set&) = delete;
816 
817  // Return the thread id of the thread that set up this task_set and
818  // submitted its tasks to the thread pool.
819  std::thread::id submitter() const { return m_submitter_thread; }
820 
821  // Save a future (presumably returned by a threadpool::push() as part
822  // of this task set.
823  void push(std::future<void>&& f)
824  {
825  DASSERT(
826  std::this_thread::get_id() == submitter()
827  && "All tasks in a tast_set should be added by the same thread");
828  m_futures.emplace_back(std::move(f));
829  }
830 
831  // Wait for the given taskindex (0..n-1, where n is the number of tasks
832  // submitted as part of this task_set). If block == true, fully block
833  // while waiting for that task to finish. If block is false, then busy
834  // wait, and opportunistically run queue tasks yourself while you are
835  // waiting for the task to finish.
836  void wait_for_task(size_t taskindex, bool block = false);
837 
838  // Wait for all tasks in the set to finish. If block == true, fully
839  // block while waiting for the pool threads to all finish. If block is
840  // false, then busy wait, and opportunistically run queue tasks yourself
841  // while you are waiting for other tasks to finish.
842  void wait(bool block = false);
843 
844  // Debugging sanity check, called after wait(), to ensure that all the
845  // tasks were completed.
846  void check_done()
847  {
848  const std::chrono::milliseconds wait_time(0);
849  for (auto&& f : m_futures)
850  ASSERT(f.wait_for(wait_time) == std::future_status::ready);
851  }
852 
853 private:
854  thread_pool* m_pool;
855  std::thread::id m_submitter_thread;
856  std::vector<std::future<void>> m_futures;
857 };
858 
859 
void lock_shared()
Definition: thread.h:94
spin_mutex(const spin_mutex &)
Definition: thread.h:216
null_lock(T &m)
Definition: thread.h:103
void yield()
Definition: thread.h:119
void add_thread(thread *t)
Definition: thread.h:577
GLuint id
Definition: glew.h:1679
std::lock_guard< mutex > lock_guard
Definition: thread.h:111
GLsizeiptr size
Definition: glew.h:1681
*Note that the tasks the is the thread number *for the pool
Definition: thread.h:655
spin_rw_mutex::write_lock_guard spin_rw_write_lock
Definition: thread.h:534
std::mutex Mutex
bool try_lock()
Definition: thread.h:96
void operator()()
Definition: thread.h:171
lock_guard(spin_mutex &fm)
Definition: thread.h:276
const GLdouble * m
Definition: glew.h:9124
spin_mutex::lock_guard spin_lock
Definition: thread.h:295
Platform-related macros.
atomic_backoff(int pausemax=16)
Definition: thread.h:165
void read_lock()
Definition: thread.h:437
thread_group()
Definition: thread.h:574
size_t OIIO_API Hash(const char *s, size_t len)
void join_all()
Definition: thread.h:593
void unlock()
Definition: thread.h:259
~null_mutex()
Definition: thread.h:91
atomic< int > atomic_int
Definition: atomic.h:51
#define DASSERT(x)
Definition: dassert.h:99
GLclampf f
Definition: glew.h:3499
mutex_pool()
Definition: thread.h:553
read_lock_guard(spin_rw_mutex &fm)
Definition: thread.h:501
write_lock_guard(spin_rw_mutex &fm)
Definition: thread.h:513
void write_lock()
Definition: thread.h:474
*tasks wait()
OIIO_API thread_pool * default_thread_pool()
spin_rw_mutex()
Definition: thread.h:427
void lock()
Definition: thread.h:224
spin_mutex(void)
Definition: thread.h:211
null_mutex()
Definition: thread.h:90
ImageBuf OIIO_API resize(const ImageBuf &src, string_view filtername="", float filterwidth=0.0f, ROI roi={}, int nthreads=0)
typedef int(WINAPI *PFNWGLRELEASEPBUFFERDCARBPROC)(HPBUFFERARB hPbuffer
*Note that the tasks the is the thread number *for the or if it s being executed by a non pool thread(this *can happen in cases where the whole pool is occupied and the calling *thread contributes to running the work load).**Thread pool.Have fun
void unlock_shared()
Definition: thread.h:95
~task_set()
Definition: thread.h:812
task_set(thread_pool *pool=nullptr)
Definition: thread.h:807
Wrappers and utilities for atomics.
size_t size() const
Definition: thread.h:601
GLenum func
Definition: glcorearb.h:782
~spin_mutex(void)
Definition: thread.h:212
#define OIIO_CACHE_ALIGN
Definition: platform.h:215
#define OIIO_UNLIKELY(x)
Definition: platform.h:234
void push(std::future< void > &&f)
Definition: thread.h:823
#define ASSERT(assertion, type, text)
Definition: IexMacros.h:145
auto push(F &&f, Rest &&...rest) -> std::future< decltype(f(0, rest...))>
Definition: thread.h:710
const spin_rw_mutex & operator=(const spin_rw_mutex &)=delete
~thread_group()
Definition: thread.h:575
std::thread::id submitter() const
Definition: thread.h:819
const spin_mutex & operator=(const spin_mutex &)
Definition: thread.h:220
std::lock_guard< recursive_mutex > recursive_lock_guard
Definition: thread.h:112
spin_rw_mutex::read_lock_guard spin_rw_read_lock
Definition: thread.h:533
**If you just want to fire and args
Definition: thread.h:634
~spin_rw_mutex()
Definition: thread.h:429
auto push(F &&f) -> std::future< decltype(f(0))>
Definition: thread.h:690
void check_done()
Definition: thread.h:846
Mutex & operator[](const Key &key)
Definition: thread.h:554
#define OIIO_NAMESPACE_END
Definition: oiioversion.h:66
bool try_lock()
Definition: thread.h:267
void read_unlock()
Definition: thread.h:464
void lock()
Definition: thread.h:92
GA_API const UT_StringHolder rest
thread * create_thread(FUNC func, Args &&...args)
Definition: thread.h:586
void write_unlock()
Definition: thread.h:491
void pause(int delay)
Definition: thread.h:134
GLdouble GLdouble t
Definition: glew.h:1398
void unlock()
Definition: thread.h:93
bool is_worker()
Definition: thread.h:747
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the and then *wait for them to all complete We provide a helper task_set
Definition: thread.h:643
#define OIIO_CACHE_LINE_SIZE
Definition: platform.h:212
#define OIIO_NAMESPACE_BEGIN
Definition: oiioversion.h:65
#define OIIO_API
Definition: export.h:91