HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
parallel.h
Go to the documentation of this file.
1 // Copyright Contributors to the OpenImageIO project.
2 // SPDX-License-Identifier: Apache-2.0
3 // https://github.com/AcademySoftwareFoundation/OpenImageIO
4 
5 #pragma once
6 
7 #include <algorithm>
8 #include <atomic>
9 #include <future>
10 #include <memory>
11 #include <mutex>
12 #include <thread>
13 #include <vector>
14 
16 #include <OpenImageIO/strutil.h>
17 #include <OpenImageIO/thread.h>
18 
19 
21 
22 /// Split strategies
23 /// DEPRECATED(2.4)
25 
26 
27 /// Encapsulation of options that control parallel_for() and
28 /// parallel_image().
29 /// DEPRECATED(2.4)
31 public:
33  size_t minitems = 16384)
37  {
38  }
40  SplitDir splitdir = Split_Y, size_t minitems = 16384)
44  , name(name)
45  {
46  }
47 
48  // Fix up all the TBD parameters:
49  // * If no pool was specified, use the default pool.
50  // * If no max thread count was specified, use the pool size.
51  // * If the calling thread is itself in the pool and the recursive flag
52  // was not turned on, just use one thread.
53  void resolve()
54  {
55  if (pool == nullptr)
57  if (maxthreads <= 0)
58  maxthreads = pool->size() + 1; // pool size + caller
59  if (!recursive && pool->is_worker())
60  maxthreads = 1;
61  }
62 
63  bool singlethread() const { return maxthreads == 1; }
64 
65  int maxthreads = 0; // Max threads (0 = use all)
66  SplitDir splitdir = Split_Y; // Primary split direction
67  bool recursive = false; // Allow thread pool recursion
68  size_t minitems = 16384; // Min items per task
69  thread_pool* pool = nullptr; // If non-NULL, custom thread pool
70  string_view name; // For debugging
71 };
72 
73 
74 
75 #define OIIO_PARALLEL_PAROPT
76 
77 /// Encapsulation of options that control parallel_for() and
78 /// parallel_image().
80 public:
81  enum class ParStrategy : short { Default = 0, TryTBB, OIIOpool };
82  enum class SplitDir : short { X, Y, Z, Biggest, Tile };
83 
84  constexpr paropt(int maxthreads = 0, SplitDir splitdir = SplitDir::Y,
85  size_t minitems = 1024) noexcept
86  : m_maxthreads(maxthreads)
87  , m_splitdir(splitdir)
88  , m_minitems(minitems)
89  {
90  }
91  paropt(string_view name, int maxthreads = 0,
92  SplitDir splitdir = SplitDir::Y, size_t minitems = 1024) noexcept
93  : paropt(maxthreads, splitdir, minitems)
94  {
95  // m_name = name;
96  }
97 
98  constexpr paropt(ParStrategy strat) noexcept
99  : m_strategy(strat)
100  {
101  }
102 
103  constexpr paropt(int maxthreads, ParStrategy strat) noexcept
104  : m_maxthreads(maxthreads)
105  , m_strategy(strat)
106  {
107  }
108 
109  // For back compatibility
110  paropt(const parallel_options& po) noexcept
111  : paropt(po.name, po.maxthreads, SplitDir(po.splitdir), po.minitems)
112  {
113  m_recursive = po.recursive;
114  m_pool = po.pool;
115  }
116 
117  // Fix up all the TBD parameters:
118  // * If no pool was specified, use the default pool.
119  // * If no max thread count was specified, use the pool size.
120  // * If the calling thread is itself in the pool and the recursive flag
121  // was not turned on, just use one thread.
122  void resolve();
123 
124  constexpr bool singlethread() const noexcept { return m_maxthreads == 1; }
125 
126  constexpr int maxthreads() const noexcept { return m_maxthreads; }
127  paropt& maxthreads(int m) noexcept
128  {
129  m_maxthreads = m;
130  return *this;
131  }
132 
133  constexpr SplitDir splitdir() const noexcept { return m_splitdir; }
135  {
136  m_splitdir = s;
137  return *this;
138  }
139 
140  constexpr bool recursive() const noexcept { return m_recursive; }
141  paropt& recursive(bool r) noexcept
142  {
143  m_recursive = r;
144  return *this;
145  }
146 
147  constexpr int minitems() const noexcept { return m_minitems; }
148  paropt& minitems(int m) noexcept
149  {
150  m_minitems = m;
151  return *this;
152  }
153 
154  thread_pool* pool() const noexcept { return m_pool; }
155  paropt& pool(thread_pool* p) noexcept
156  {
157  m_pool = p;
158  return *this;
159  }
160 
161  constexpr ParStrategy strategy() const noexcept { return m_strategy; }
163  {
164  m_strategy = s;
165  return *this;
166  }
167 
168 private:
169  int m_maxthreads = 0; // Max threads (0 = use all)
170  ParStrategy m_strategy = ParStrategy::Default;
171  SplitDir m_splitdir = SplitDir::Y; // Primary split direction
172  size_t m_minitems = 16384; // Min items per task
173  thread_pool* m_pool = nullptr; // If non-NULL, custom thread pool
174  bool m_recursive = false; // Allow thread pool recursion
175 };
176 
177 
178 
179 /// Parallel "for" loop, chunked: for a task that takes an int64_t
180 /// [begin,end) range, break it into non-overlapping sections that run in
181 /// parallel:
182 ///
183 /// task (begin, begin+chunksize);
184 /// task (begin+chunksize, begin+2*chunksize);
185 /// ...
186 /// task (begin+n*chunksize, end);
187 ///
188 /// and wait for them all to complete.
189 ///
190 /// If chunksize is 0, a chunksize will be chosen to divide the range into
191 /// a number of chunks equal to the twice number of threads in the queue.
192 /// (We do this to offer better load balancing than if we used exactly the
193 /// thread count.)
194 OIIO_UTIL_API void
195 parallel_for_chunked(int64_t begin, int64_t end, int64_t chunksize,
196  std::function<void(int64_t, int64_t)>&& task,
197  paropt opt = paropt(0, paropt::SplitDir::Y, 1));
198 
199 
200 
201 /// Parallel "for" loop, for a task that takes a single integer index, run
202 /// it on all indices on the range [begin,end):
203 ///
204 /// task (begin);
205 /// task (begin+1);
206 /// ...
207 /// task (end-1);
208 ///
209 /// Conceptually, it behaves as if each index gets called separately, but
210 /// actually each thread will iterate over some chunk of adjacent indices
211 /// (to aid data coherence and minimize the amount of thread queue
212 /// diddling). The chunk size is chosen automatically.
213 OIIO_UTIL_API void
214 parallel_for(int32_t begin, int32_t end, function_view<void(int32_t)> task,
215  paropt opt = 0);
216 
217 OIIO_UTIL_API void
218 parallel_for(int64_t begin, int64_t end, function_view<void(int64_t)> task,
219  paropt opt = 0);
220 
221 OIIO_UTIL_API void
222 parallel_for(uint32_t begin, uint32_t end, function_view<void(uint32_t)> task,
223  paropt opt = 0);
224 
225 OIIO_UTIL_API void
226 parallel_for(uint64_t begin, uint64_t end, function_view<void(uint64_t)> task,
227  paropt opt = 0);
228 
229 
230 /// Parallel "for" loop, for a task that takes an integer range, run it
231 /// on all indices on the range [begin,end):
232 ///
233 /// task (begin, i1);
234 /// task (i1+1, i2);
235 /// ...
236 /// task (iN+1, end);
237 ///
238 /// The chunk sizes will be chosen automatically, and are not guaranteed
239 /// to all be the same size.
240 OIIO_UTIL_API void
241 parallel_for_range(int32_t begin, int32_t end,
242  std::function<void(int32_t, int32_t)>&& task,
243  paropt opt = 0);
244 
245 OIIO_UTIL_API void
246 parallel_for_range(int64_t begin, int64_t end,
247  std::function<void(int64_t, int64_t)>&& task,
248  paropt opt = 0);
249 
250 OIIO_UTIL_API void
251 parallel_for_range(uint32_t begin, uint32_t end,
252  std::function<void(uint32_t, uint32_t)>&& task,
253  paropt opt = 0);
254 
255 OIIO_UTIL_API void
256 parallel_for_range(uint64_t begin, uint64_t end,
257  std::function<void(uint64_t, uint64_t)>&& task,
258  paropt opt = 0);
259 
260 
261 /// Parallel "for" loop, chunked: for a task that takes a 2D [begin,end)
262 /// range and chunk sizes, subdivide that range and run in parallel:
263 ///
264 /// task (begin, begin+chunksize);
265 /// task (begin+chunksize, begin+2*chunksize);
266 /// ...
267 /// task (begin+n*chunksize, end);
268 ///
269 /// and wait for them all to complete.
270 ///
271 /// If chunksize is 0, a chunksize will be chosen to divide the range into
272 /// a number of chunks equal to the twice number of threads in the queue.
273 /// (We do this to offer better load balancing than if we used exactly the
274 /// thread count.)
275 OIIO_UTIL_API void
276 parallel_for_chunked_2D(int64_t xbegin, int64_t xend, int64_t xchunksize,
277  int64_t ybegin, int64_t yend, int64_t ychunksize,
278  std::function<void(int64_t xbeg, int64_t xend,
279  int64_t ybeg, int64_t yend)>&& task,
280  paropt opt = 0);
281 
282 
283 
284 /// parallel_for, for a task that takes an int threadid and int64_t x & y
285 /// indices, running all of:
286 /// task (xbegin, ybegin);
287 /// ...
288 /// task (xend-1, ybegin);
289 /// task (xbegin, ybegin+1);
290 /// task (xend-1, ybegin+1);
291 /// ...
292 /// task (xend-1, yend-1);
293 OIIO_UTIL_API void
294 parallel_for_2D(int64_t xbegin, int64_t xend, int64_t ybegin, int64_t yend,
295  std::function<void(int64_t x, int64_t y)>&& task,
296  paropt opt = 0);
297 
298 
299 
300 #if OIIO_VERSION < OIIO_MAKE_VERSION(3, 0, 0)
301 
302 // Deprecated versions of parallel loops where the task functions take a
303 // thread ID in addition to the range. These are deprecated as of OIIO 2.3,
304 // will warn about deprecation starting in OIIO 2.4, and will be removed
305 // starting with OIIO 3.0.
306 
307 // OIIO_DEPRECATED("Use tasks that don't take a thread ID (2.3)")
308 OIIO_UTIL_API void
309 parallel_for_chunked(int64_t begin, int64_t end, int64_t chunksize,
310  std::function<void(int id, int64_t b, int64_t e)>&& task,
311  paropt opt = paropt(0, paropt::SplitDir::Y, 1));
312 
313 // OIIO_DEPRECATED("Use tasks that don't take a thread ID (2.3)")
314 OIIO_UTIL_API void
315 parallel_for(int64_t begin, int64_t end,
316  std::function<void(int id, int64_t index)>&& task,
317  paropt opt = paropt(0, paropt::SplitDir::Y, 1));
318 
319 // OIIO_DEPRECATED("Use tasks that don't take a thread ID (2.3)")
320 OIIO_UTIL_API void
322  int64_t xbegin, int64_t xend, int64_t xchunksize, int64_t ybegin,
323  int64_t yend, int64_t ychunksize,
324  std::function<void(int id, int64_t, int64_t, int64_t, int64_t)>&& task,
325  paropt opt = 0);
326 
327 // OIIO_DEPRECATED("Use tasks that don't take a thread ID (2.3)")
328 inline void
329 parallel_for_2D(int64_t xbegin, int64_t xend, int64_t ybegin, int64_t yend,
330  std::function<void(int id, int64_t i, int64_t j)>&& task,
331  paropt opt = 0)
332 {
334  xbegin, xend, 0, ybegin, yend, 0,
335  [&task](int id, int64_t xb, int64_t xe, int64_t yb, int64_t ye) {
336  for (auto y = yb; y < ye; ++y)
337  for (auto x = xb; x < xe; ++x)
338  task(id, x, y);
339  },
340  opt);
341 }
342 
343 // Deprecated parallel_for_each. We never used it and I decided I didn't
344 // like the implementation and didn't want its guts exposed any more. For
345 // compatibility (just in case somebody has used it), implement it serially
346 // so that it's correct, even if it's not fast. It will eventually be
347 // removed.
348 template<class InputIt, class UnaryFunction>
349 // OIIO_DEPRECATED("Don't use this (2.3)")
350 UnaryFunction
351 parallel_for_each(InputIt begin, InputIt end, UnaryFunction f,
352  paropt opt = paropt(0, paropt::SplitDir::Y, 1))
353 {
354  return std::for_each(begin, end, f);
355 }
356 
357 // DEPRECATED(1.8): This version accidentally accepted chunksizes that
358 // weren't used. Preserve for a version to not break 3rd party apps.
359 OIIO_DEPRECATED("Use the version without chunk sizes (1.8)")
360 inline void
361 parallel_for_2D(int64_t xbegin, int64_t xend, int64_t /*xchunksize*/,
362  int64_t ybegin, int64_t yend, int64_t /*ychunksize*/,
363  std::function<void(int id, int64_t i, int64_t j)>&& task)
364 {
365  parallel_for_2D(xbegin, xend, ybegin, yend, std::move(task));
366 }
367 
368 #endif /* Deprecated functions */
369 
370 
OIIO_UTIL_API void parallel_for_2D(int64_t xbegin, int64_t xend, int64_t ybegin, int64_t yend, std::function< void(int64_t x, int64_t y)> &&task, paropt opt=0)
OIIO_UTIL_API thread_pool * default_thread_pool()
Y
Definition: ImathEuler.h:184
thread_pool * pool
Definition: parallel.h:69
constexpr SplitDir splitdir() const noexcept
Definition: parallel.h:133
paropt & splitdir(SplitDir s) noexcept
Definition: parallel.h:134
void resolve()
Definition: parallel.h:53
GLdouble s
Definition: glad.h:3009
#define OIIO_DEPRECATED(msg)
Definition: platform.h:466
GLint y
Definition: glcorearb.h:103
paropt(string_view name, int maxthreads=0, SplitDir splitdir=SplitDir::Y, size_t minitems=1024) noexcept
Definition: parallel.h:91
String-related utilities, all in namespace Strutil.
#define OIIO_UTIL_API
Definition: export.h:71
bool singlethread() const
Definition: parallel.h:63
paropt & maxthreads(int m) noexcept
Definition: parallel.h:127
Wrappers and utilities for multithreading.
constexpr int maxthreads() const noexcept
Definition: parallel.h:126
paropt(const parallel_options &po) noexcept
Definition: parallel.h:110
constexpr paropt(ParStrategy strat) noexcept
Definition: parallel.h:98
GLfloat f
Definition: glcorearb.h:1926
paropt & recursive(bool r) noexcept
Definition: parallel.h:141
GLuint GLuint end
Definition: glcorearb.h:475
OIIO_UTIL_API void parallel_for(int32_t begin, int32_t end, function_view< void(int32_t)> task, paropt opt=0)
size_t minitems
Definition: parallel.h:68
ParStrategy
Definition: parallel.h:81
paropt & strategy(ParStrategy s) noexcept
Definition: parallel.h:162
parallel_options(int maxthreads=0, SplitDir splitdir=Split_Y, size_t minitems=16384)
Definition: parallel.h:32
GLuint const GLchar * name
Definition: glcorearb.h:786
OIIO_UTIL_API void parallel_for_chunked_2D(int64_t xbegin, int64_t xend, int64_t xchunksize, int64_t ybegin, int64_t yend, int64_t ychunksize, std::function< void(int64_t xbeg, int64_t xend, int64_t ybeg, int64_t yend)> &&task, paropt opt=0)
constexpr paropt(int maxthreads=0, SplitDir splitdir=SplitDir::Y, size_t minitems=1024) noexcept
Definition: parallel.h:84
GLboolean GLboolean GLboolean b
Definition: glcorearb.h:1222
GLint GLenum GLint x
Definition: glcorearb.h:409
constexpr paropt(int maxthreads, ParStrategy strat) noexcept
Definition: parallel.h:103
int size() const
How many threads are in the pool?
SplitDir splitdir
Definition: parallel.h:66
SplitDir
Definition: parallel.h:24
GLint j
Definition: glad.h:2733
paropt & minitems(int m) noexcept
Definition: parallel.h:148
SplitDir
Definition: parallel.h:82
constexpr ParStrategy strategy() const noexcept
Definition: parallel.h:161
constexpr int minitems() const noexcept
Definition: parallel.h:147
constexpr bool singlethread() const noexcept
Definition: parallel.h:124
GLuint index
Definition: glcorearb.h:786
constexpr bool recursive() const noexcept
Definition: parallel.h:140
OIIO_UTIL_API void parallel_for_chunked(int64_t begin, int64_t end, int64_t chunksize, std::function< void(int64_t, int64_t)> &&task, paropt opt=paropt(0, paropt::SplitDir::Y, 1))
paropt & pool(thread_pool *p) noexcept
Definition: parallel.h:155
parallel_options(string_view name, int maxthreads=0, SplitDir splitdir=Split_Y, size_t minitems=16384)
Definition: parallel.h:39
GLboolean r
Definition: glcorearb.h:1222
#define OIIO_NAMESPACE_END
Definition: oiioversion.h:127
string_view name
Definition: parallel.h:70
OIIO_UTIL_API void parallel_for_range(int32_t begin, int32_t end, std::function< void(int32_t, int32_t)> &&task, paropt opt=0)
bool is_worker(std::thread::id id) const
thread_pool * pool() const noexcept
Definition: parallel.h:154
#define OIIO_NAMESPACE_BEGIN
Definition: oiioversion.h:126
PcpNodeRef_ChildrenIterator begin(const PcpNodeRef::child_const_range &r)
Support for range-based for loops for PcpNodeRef children ranges.
Definition: node.h:566