HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
parallel.h
Go to the documentation of this file.
1 // Copyright 2008-present Contributors to the OpenImageIO project.
2 // SPDX-License-Identifier: BSD-3-Clause
3 // https://github.com/OpenImageIO/oiio/blob/master/LICENSE.md
4 
5 // clang-format off
6 
7 #pragma once
8 
9 #include <algorithm>
10 #include <atomic>
11 #include <future>
12 #include <memory>
13 #include <mutex>
14 #include <thread>
15 #include <vector>
16 
17 #include <OpenImageIO/strutil.h>
18 #include <OpenImageIO/thread.h>
19 
20 
22 
23 /// Split strategies
25 
26 
27 /// Encapsulation of options that control parallel_image().
29 public:
31  size_t minitems = 16384)
35  {
36  }
38  SplitDir splitdir = Split_Y, size_t minitems = 16384)
42  , name(name)
43  {
44  }
45 
46  // Fix up all the TBD parameters:
47  // * If no pool was specified, use the default pool.
48  // * If no max thread count was specified, use the pool size.
49  // * If the calling thread is itself in the pool and the recursive flag
50  // was not turned on, just use one thread.
51  void resolve()
52  {
53  if (pool == nullptr)
55  if (maxthreads <= 0)
56  maxthreads = pool->size() + 1; // pool size + caller
57  if (!recursive && pool->is_worker())
58  maxthreads = 1;
59  }
60 
61  bool singlethread() const { return maxthreads == 1; }
62 
63  int maxthreads = 0; // Max threads (0 = use all)
64  SplitDir splitdir = Split_Y; // Primary split direction
65  bool recursive = false; // Allow thread pool recursion
66  size_t minitems = 16384; // Min items per task
67  thread_pool* pool = nullptr; // If non-NULL, custom thread pool
68  string_view name; // For debugging
69 };
70 
71 
72 
73 /// Parallel "for" loop, chunked: for a task that takes an int thread ID
74 /// followed by an int64_t [begin,end) range, break it into non-overlapping
75 /// sections that run in parallel using the default thread pool:
76 ///
77 /// task (threadid, start, start+chunksize);
78 /// task (threadid, start+chunksize, start+2*chunksize);
79 /// ...
80 /// task (threadid, start+n*chunksize, end);
81 ///
82 /// and wait for them all to complete.
83 ///
84 /// If chunksize is 0, a chunksize will be chosen to divide the range into
85 /// a number of chunks equal to the twice number of threads in the queue.
86 /// (We do this to offer better load balancing than if we used exactly the
87 /// thread count.)
88 ///
89 /// Note that the thread_id may be -1, indicating that it's being executed
90 /// by the calling thread itself, or perhaps some other helpful thread that
91 /// is stealing work from the pool.
92 OIIO_API void
93 parallel_for_chunked(int64_t start, int64_t end, int64_t chunksize,
94  std::function<void(int id, int64_t b, int64_t e)>&& task,
96 // Implementation is in thread.cpp
97 
98 
99 
100 /// Parallel "for" loop, chunked: for a task that takes a [begin,end) range
101 /// (but not a thread ID).
102 inline void
103 parallel_for_chunked(int64_t start, int64_t end, int64_t chunksize,
104  std::function<void(int64_t, int64_t)>&& task,
106 {
107  auto wrapper = [&](int /*id*/, int64_t b, int64_t e) { task(b, e); };
108  parallel_for_chunked(start, end, chunksize, wrapper, opt);
109 }
110 
111 
112 
113 /// Parallel "for" loop, for a task that takes a single int64_t index, run
114 /// it on all indices on the range [begin,end):
115 ///
116 /// task (begin);
117 /// task (begin+1);
118 /// ...
119 /// task (end-1);
120 ///
121 /// Using the default thread pool, spawn parallel jobs. Conceptually, it
122 /// behaves as if each index gets called separately, but actually each
123 /// thread will iterate over some chunk of adjacent indices (to aid data
124 /// coherence and minimize the amount of thread queue diddling). The chunk
125 /// size is chosen automatically.
126 inline void
127 parallel_for (int64_t start, int64_t end,
128  std::function<void(int64_t index)>&& task,
130 {
131  parallel_for_chunked (start, end, 0, [&task](int /*id*/, int64_t i, int64_t e) {
132  for ( ; i < e; ++i)
133  task (i);
134  }, opt);
135 }
136 
137 
138 /// parallel_for, for a task that takes an int threadid and an int64_t
139 /// index, running all of:
140 /// task (id, begin);
141 /// task (id, begin+1);
142 /// ...
143 /// task (id, end-1);
144 inline void
145 parallel_for (int64_t start, int64_t end,
146  std::function<void(int id, int64_t index)>&& task,
148 {
149  parallel_for_chunked (start, end, 0, [&task](int id, int64_t i, int64_t e) {
150  for ( ; i < e; ++i)
151  task (id, i);
152  }, opt);
153 }
154 
155 
156 
157 /// parallel_for_each, semantically is like std::for_each(), but each
158 /// iteration is a separate job for the default thread pool.
159 template<class InputIt, class UnaryFunction>
160 UnaryFunction
161 parallel_for_each (InputIt first, InputIt last, UnaryFunction f,
163 {
164  opt.resolve ();
165  task_set ts (opt.pool);
166  for ( ; first != last; ++first) {
167  if (opt.singlethread() || opt.pool->very_busy()) {
168  // If we are using just one thread, or if the pool is already
169  // oversubscribed, do it ourselves and avoid messing with the
170  // queue or handing off between threads.
171  f (*first);
172  } else {
173  ts.push (opt.pool->push ([&](int /*id*/){ f(*first); }));
174  }
175  }
176  return std::move(f);
177 }
178 
179 
180 
181 /// Parallel "for" loop in 2D, chunked: for a task that takes an int thread
182 /// ID followed by begin, end, chunksize for each of x and y, subdivide that
183 /// run in parallel using the default thread pool.
184 ///
185 /// task (threadid, xstart, xstart+xchunksize, );
186 /// task (threadid, start+chunksize, start+2*chunksize);
187 /// ...
188 /// task (threadid, start+n*chunksize, end);
189 ///
190 /// and wait for them all to complete.
191 ///
192 /// If chunksize is 0, a chunksize will be chosen to divide the range into
193 /// a number of chunks equal to the twice number of threads in the queue.
194 /// (We do this to offer better load balancing than if we used exactly the
195 /// thread count.)
196 OIIO_API void
197 parallel_for_chunked_2D (int64_t xstart, int64_t xend, int64_t xchunksize,
198  int64_t ystart, int64_t yend, int64_t ychunksize,
199  std::function<void(int id, int64_t, int64_t,
200  int64_t, int64_t)>&& task,
201  parallel_options opt=0);
202 // Implementation is in thread.cpp
203 
204 
205 
206 /// Parallel "for" loop, chunked: for a task that takes a 2D [begin,end)
207 /// range and chunk sizes.
208 inline void
209 parallel_for_chunked_2D (int64_t xstart, int64_t xend, int64_t xchunksize,
210  int64_t ystart, int64_t yend, int64_t ychunksize,
211  std::function<void(int64_t, int64_t,
212  int64_t, int64_t)>&& task,
213  parallel_options opt=0)
214 {
215  auto wrapper = [&](int /*id*/, int64_t xb, int64_t xe,
216  int64_t yb, int64_t ye) { task(xb,xe,yb,ye); };
217  parallel_for_chunked_2D (xstart, xend, xchunksize,
218  ystart, yend, ychunksize, wrapper, opt);
219 }
220 
221 
222 
223 /// parallel_for, for a task that takes an int threadid and int64_t x & y
224 /// indices, running all of:
225 /// task (id, xstart, ystart);
226 /// ...
227 /// task (id, xend-1, ystart);
228 /// task (id, xstart, ystart+1);
229 /// task (id, xend-1, ystart+1);
230 /// ...
231 /// task (id, xend-1, yend-1);
232 inline void
233 parallel_for_2D (int64_t xstart, int64_t xend,
234  int64_t ystart, int64_t yend,
235  std::function<void(int id, int64_t i, int64_t j)>&& task,
236  parallel_options opt=0)
237 {
238  parallel_for_chunked_2D (xstart, xend, 0, ystart, yend, 0,
239  [&task](int id, int64_t xb, int64_t xe, int64_t yb, int64_t ye) {
240  for (auto y = yb; y < ye; ++y)
241  for (auto x = xb; x < xe; ++x)
242  task (id, x, y);
243  }, opt);
244 }
245 
246 
247 
248 /// parallel_for, for a task that takes an int threadid and int64_t x & y
249 /// indices, running all of:
250 /// task (xstart, ystart);
251 /// ...
252 /// task (xend-1, ystart);
253 /// task (xstart, ystart+1);
254 /// task (xend-1, ystart+1);
255 /// ...
256 /// task (xend-1, yend-1);
257 inline void
258 parallel_for_2D (int64_t xstart, int64_t xend,
259  int64_t ystart, int64_t yend,
260  std::function<void(int64_t i, int64_t j)>&& task,
261  parallel_options opt=0)
262 {
263  parallel_for_chunked_2D (xstart, xend, 0, ystart, yend, 0,
264  [&task](int /*id*/, int64_t xb, int64_t xe, int64_t yb, int64_t ye) {
265  for (auto y = yb; y < ye; ++y)
266  for (auto x = xb; x < xe; ++x)
267  task (x, y);
268  }, opt);
269 }
270 
271 
272 
273 // DEPRECATED(1.8): This version accidentally accepted chunksizes that
274 // weren't used. Preserve for a version to not break 3rd party apps.
275 OIIO_DEPRECATED("Use the version without chunk sizes (1.8)")
276 inline void
277 parallel_for_2D (int64_t xstart, int64_t xend, int64_t /*xchunksize*/,
278  int64_t ystart, int64_t yend, int64_t /*ychunksize*/,
279  std::function<void(int id, int64_t i, int64_t j)>&& task)
280 {
281  parallel_for_2D (xstart, xend, ystart, yend,
282  std::forward<std::function<void(int,int64_t,int64_t)>>(task));
283 }
284 
void parallel_for(int64_t start, int64_t end, std::function< void(int64_t index)> &&task, parallel_options opt=parallel_options(0, Split_Y, 1))
Definition: parallel.h:127
GLboolean GLboolean GLboolean b
Definition: glcorearb.h:1221
GLint first
Definition: glcorearb.h:404
thread_pool * pool
Definition: parallel.h:67
Encapsulation of options that control parallel_image().
Definition: parallel.h:28
GLuint start
Definition: glcorearb.h:474
void resolve()
Definition: parallel.h:51
OIIO_API void parallel_for_chunked(int64_t start, int64_t end, int64_t chunksize, std::function< void(int id, int64_t b, int64_t e)> &&task, parallel_options opt=parallel_options(0, Split_Y, 1))
OIIO_API void parallel_for_chunked_2D(int64_t xstart, int64_t xend, int64_t xchunksize, int64_t ystart, int64_t yend, int64_t ychunksize, std::function< void(int id, int64_t, int64_t, int64_t, int64_t)> &&task, parallel_options opt=0)
#define OIIO_DEPRECATED(msg)
Definition: platform.h:421
GLuint const GLchar * name
Definition: glcorearb.h:785
String-related utilities, all in namespace Strutil.
bool singlethread() const
Definition: parallel.h:61
GLint GLenum GLint x
Definition: glcorearb.h:408
void parallel_for_2D(int64_t xstart, int64_t xend, int64_t ystart, int64_t yend, std::function< void(int id, int64_t i, int64_t j)> &&task, parallel_options opt=0)
Definition: parallel.h:233
Wrappers and utilities for multithreading.
UnaryFunction parallel_for_each(InputIt first, InputIt last, UnaryFunction f, parallel_options opt=parallel_options(0, Split_Y, 1))
Definition: parallel.h:161
OIIO_API thread_pool * default_thread_pool()
GLuint GLuint end
Definition: glcorearb.h:474
size_t minitems
Definition: parallel.h:66
parallel_options(int maxthreads=0, SplitDir splitdir=Split_Y, size_t minitems=16384)
Definition: parallel.h:30
int size() const
How many threads are in the pool?
SplitDir splitdir
Definition: parallel.h:64
SplitDir
Split strategies.
Definition: parallel.h:24
void push(std::future< void > &&f)
Definition: thread.h:814
GLuint index
Definition: glcorearb.h:785
GLfloat f
Definition: glcorearb.h:1925
parallel_options(string_view name, int maxthreads=0, SplitDir splitdir=Split_Y, size_t minitems=16384)
Definition: parallel.h:37
#define OIIO_NAMESPACE_END
Definition: oiioversion.h:94
string_view name
Definition: parallel.h:68
GLint y
Definition: glcorearb.h:102
bool is_worker(std::thread::id id) const
#define OIIO_NAMESPACE_BEGIN
Definition: oiioversion.h:93
#define OIIO_API
Definition: export.h:65