HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
parallel.h
Go to the documentation of this file.
1 /*
2  Copyright 2016 Larry Gritz and the other authors and contributors.
3  All Rights Reserved.
4 
5  Redistribution and use in source and binary forms, with or without
6  modification, are permitted provided that the following conditions are
7  met:
8  * Redistributions of source code must retain the above copyright
9  notice, this list of conditions and the following disclaimer.
10  * Redistributions in binary form must reproduce the above copyright
11  notice, this list of conditions and the following disclaimer in the
12  documentation and/or other materials provided with the distribution.
13  * Neither the name of the software's owners nor the names of its
14  contributors may be used to endorse or promote products derived from
15  this software without specific prior written permission.
16  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 
28  (This is the Modified BSD License)
29 */
30 
31 // clang-format off
32 
33 #pragma once
34 
35 #include <algorithm>
36 #include <atomic>
37 #include <future>
38 #include <memory>
39 #include <mutex>
40 #include <thread>
41 #include <vector>
42 
43 #include <OpenImageIO/strutil.h>
44 #include <OpenImageIO/thread.h>
45 
46 
48 
49 /// Split strategies
51 
52 
53 /// Encapsulation of options that control parallel_image().
55 public:
57  size_t minitems = 16384)
61  {
62  }
64  SplitDir splitdir = Split_Y, size_t minitems = 16384)
68  , name(name)
69  {
70  }
71 
72  // Fix up all the TBD parameters:
73  // * If no pool was specified, use the default pool.
74  // * If no max thread count was specified, use the pool size.
75  // * If the calling thread is itself in the pool and the recursive flag
76  // was not turned on, just use one thread.
77  void resolve()
78  {
79  if (pool == nullptr)
81  if (maxthreads <= 0)
82  maxthreads = pool->size() + 1; // pool size + caller
83  if (!recursive && pool->is_worker())
84  maxthreads = 1;
85  }
86 
87  bool singlethread() const { return maxthreads == 1; }
88 
89  int maxthreads = 0; // Max threads (0 = use all)
90  SplitDir splitdir = Split_Y; // Primary split direction
91  bool recursive = false; // Allow thread pool recursion
92  size_t minitems = 16384; // Min items per task
93  thread_pool* pool = nullptr; // If non-NULL, custom thread pool
94  string_view name; // For debugging
95 };
96 
97 
98 
99 /// Parallel "for" loop, chunked: for a task that takes an int thread ID
100 /// followed by an int64_t [begin,end) range, break it into non-overlapping
101 /// sections that run in parallel using the default thread pool:
102 ///
103 /// task (threadid, start, start+chunksize);
104 /// task (threadid, start+chunksize, start+2*chunksize);
105 /// ...
106 /// task (threadid, start+n*chunksize, end);
107 ///
108 /// and wait for them all to complete.
109 ///
110 /// If chunksize is 0, a chunksize will be chosen to divide the range into
111 /// a number of chunks equal to the twice number of threads in the queue.
112 /// (We do this to offer better load balancing than if we used exactly the
113 /// thread count.)
114 ///
115 /// Note that the thread_id may be -1, indicating that it's being executed
116 /// by the calling thread itself, or perhaps some other helpful thread that
117 /// is stealing work from the pool.
118 OIIO_API void
119 parallel_for_chunked(int64_t start, int64_t end, int64_t chunksize,
120  std::function<void(int id, int64_t b, int64_t e)>&& task,
122 // Implementation is in thread.cpp
123 
124 
125 
126 /// Parallel "for" loop, chunked: for a task that takes a [begin,end) range
127 /// (but not a thread ID).
128 inline void
129 parallel_for_chunked(int64_t start, int64_t end, int64_t chunksize,
130  std::function<void(int64_t, int64_t)>&& task,
132 {
133  auto wrapper = [&](int id, int64_t b, int64_t e) { task(b, e); };
134  parallel_for_chunked(start, end, chunksize, wrapper, opt);
135 }
136 
137 
138 
139 /// Parallel "for" loop, for a task that takes a single int64_t index, run
140 /// it on all indices on the range [begin,end):
141 ///
142 /// task (begin);
143 /// task (begin+1);
144 /// ...
145 /// task (end-1);
146 ///
147 /// Using the default thread pool, spawn parallel jobs. Conceptually, it
148 /// behaves as if each index gets called separately, but actually each
149 /// thread will iterate over some chunk of adjacent indices (to aid data
150 /// coherence and minimuize the amount of thread queue diddling). The chunk
151 /// size is chosen automatically.
152 inline void
153 parallel_for (int64_t start, int64_t end,
154  std::function<void(int64_t index)>&& task,
156 {
157  parallel_for_chunked (start, end, 0, [&task](int id, int64_t i, int64_t e) {
158  for ( ; i < e; ++i)
159  task (i);
160  }, opt);
161 }
162 
163 
164 /// parallel_for, for a task that takes an int threadid and an int64_t
165 /// index, running all of:
166 /// task (id, begin);
167 /// task (id, begin+1);
168 /// ...
169 /// task (id, end-1);
170 inline void
171 parallel_for (int64_t start, int64_t end,
172  std::function<void(int id, int64_t index)>&& task,
174 {
175  parallel_for_chunked (start, end, 0, [&task](int id, int64_t i, int64_t e) {
176  for ( ; i < e; ++i)
177  task (id, i);
178  }, opt);
179 }
180 
181 
182 
183 /// parallel_for_each, semantically is like std::for_each(), but each
184 /// iteration is a separate job for the default thread pool.
185 template<class InputIt, class UnaryFunction>
186 UnaryFunction
187 parallel_for_each (InputIt first, InputIt last, UnaryFunction f,
189 {
190  opt.resolve ();
191  task_set ts (opt.pool);
192  for ( ; first != last; ++first) {
193  if (opt.singlethread() || opt.pool->very_busy()) {
194  // If we are using just one thread, or if the pool is already
195  // oversubscribed, do it ourselves and avoid messing with the
196  // queue or handing off between threads.
197  f (*first);
198  } else {
199  ts.push (opt.pool->push ([&](int id){ f(*first); }));
200  }
201  }
202  return std::move(f);
203 }
204 
205 
206 
207 /// Parallel "for" loop in 2D, chunked: for a task that takes an int thread
208 /// ID followed by begin, end, chunksize for each of x and y, subdivide that
209 /// run in parallel using the default thread pool.
210 ///
211 /// task (threadid, xstart, xstart+xchunksize, );
212 /// task (threadid, start+chunksize, start+2*chunksize);
213 /// ...
214 /// task (threadid, start+n*chunksize, end);
215 ///
216 /// and wait for them all to complete.
217 ///
218 /// If chunksize is 0, a chunksize will be chosen to divide the range into
219 /// a number of chunks equal to the twice number of threads in the queue.
220 /// (We do this to offer better load balancing than if we used exactly the
221 /// thread count.)
222 OIIO_API void
223 parallel_for_chunked_2D (int64_t xstart, int64_t xend, int64_t xchunksize,
224  int64_t ystart, int64_t yend, int64_t ychunksize,
225  std::function<void(int id, int64_t, int64_t,
226  int64_t, int64_t)>&& task,
227  parallel_options opt=0);
228 // Implementation is in thread.cpp
229 
230 
231 
232 /// Parallel "for" loop, chunked: for a task that takes a 2D [begin,end)
233 /// range and chunk sizes.
234 inline void
235 parallel_for_chunked_2D (int64_t xstart, int64_t xend, int64_t xchunksize,
236  int64_t ystart, int64_t yend, int64_t ychunksize,
237  std::function<void(int64_t, int64_t,
238  int64_t, int64_t)>&& task,
239  parallel_options opt=0)
240 {
241  auto wrapper = [&](int id, int64_t xb, int64_t xe,
242  int64_t yb, int64_t ye) { task(xb,xe,yb,ye); };
243  parallel_for_chunked_2D (xstart, xend, xchunksize,
244  ystart, yend, ychunksize, wrapper, opt);
245 }
246 
247 
248 
249 /// parallel_for, for a task that takes an int threadid and int64_t x & y
250 /// indices, running all of:
251 /// task (id, xstart, ystart);
252 /// ...
253 /// task (id, xend-1, ystart);
254 /// task (id, xstart, ystart+1);
255 /// task (id, xend-1, ystart+1);
256 /// ...
257 /// task (id, xend-1, yend-1);
258 inline void
259 parallel_for_2D (int64_t xstart, int64_t xend,
260  int64_t ystart, int64_t yend,
261  std::function<void(int id, int64_t i, int64_t j)>&& task,
262  parallel_options opt=0)
263 {
264  parallel_for_chunked_2D (xstart, xend, 0, ystart, yend, 0,
265  [&task](int id, int64_t xb, int64_t xe, int64_t yb, int64_t ye) {
266  for (auto y = yb; y < ye; ++y)
267  for (auto x = xb; x < xe; ++x)
268  task (id, x, y);
269  }, opt);
270 }
271 
272 
273 
274 /// parallel_for, for a task that takes an int threadid and int64_t x & y
275 /// indices, running all of:
276 /// task (xstart, ystart);
277 /// ...
278 /// task (xend-1, ystart);
279 /// task (xstart, ystart+1);
280 /// task (xend-1, ystart+1);
281 /// ...
282 /// task (xend-1, yend-1);
283 inline void
284 parallel_for_2D (int64_t xstart, int64_t xend,
285  int64_t ystart, int64_t yend,
286  std::function<void(int64_t i, int64_t j)>&& task,
287  parallel_options opt=0)
288 {
289  parallel_for_chunked_2D (xstart, xend, 0, ystart, yend, 0,
290  [&task](int id, int64_t xb, int64_t xe, int64_t yb, int64_t ye) {
291  for (auto y = yb; y < ye; ++y)
292  for (auto x = xb; x < xe; ++x)
293  task (x, y);
294  }, opt);
295 }
296 
297 
298 
299 // DEPRECATED(1.8): This version accidentally accepted chunksizes that
300 // weren't used. Preserve for a version to not break 3rd party apps.
301 OIIO_DEPRECATED("Use the version without chunk sizes (1.8)")
302 inline void
303 parallel_for_2D (int64_t xstart, int64_t xend, int64_t xchunksize,
304  int64_t ystart, int64_t yend, int64_t ychunksize,
305  std::function<void(int id, int64_t i, int64_t j)>&& task)
306 {
307  parallel_for_2D (xstart, xend, ystart, yend,
308  std::forward<std::function<void(int,int64_t,int64_t)>>(task));
309 }
310 
void parallel_for(int64_t start, int64_t end, std::function< void(int64_t index)> &&task, parallel_options opt=parallel_options(0, Split_Y, 1))
Definition: parallel.h:153
GLuint id
Definition: glew.h:1679
GLuint const GLchar * name
Definition: glew.h:1814
thread_pool * pool
Definition: parallel.h:93
GLuint index
Definition: glew.h:1814
Encapsulation of options that control parallel_image().
Definition: parallel.h:54
void resolve()
Definition: parallel.h:77
OIIO_API void parallel_for_chunked(int64_t start, int64_t end, int64_t chunksize, std::function< void(int id, int64_t b, int64_t e)> &&task, parallel_options opt=parallel_options(0, Split_Y, 1))
OIIO_API void parallel_for_chunked_2D(int64_t xstart, int64_t xend, int64_t xchunksize, int64_t ystart, int64_t yend, int64_t ychunksize, std::function< void(int id, int64_t, int64_t, int64_t, int64_t)> &&task, parallel_options opt=0)
const GLint * first
Definition: glew.h:1528
#define OIIO_DEPRECATED(msg)
Definition: platform.h:310
bool is_worker(std::thread::id id)
String-related utilities, all in namespace Strutil.
bool singlethread() const
Definition: parallel.h:87
void parallel_for_2D(int64_t xstart, int64_t xend, int64_t ystart, int64_t yend, std::function< void(int id, int64_t i, int64_t j)> &&task, parallel_options opt=0)
Definition: parallel.h:259
Wrappers and utilities for multithreading.
GLclampf f
Definition: glew.h:3499
GLint GLint GLint GLint GLint x
Definition: glew.h:1252
GLint GLint GLint GLint GLint GLint y
Definition: glew.h:1252
UnaryFunction parallel_for_each(InputIt first, InputIt last, UnaryFunction f, parallel_options opt=parallel_options(0, Split_Y, 1))
Definition: parallel.h:187
OIIO_API thread_pool * default_thread_pool()
GLuint GLuint end
Definition: glew.h:1253
size_t minitems
Definition: parallel.h:92
parallel_options(int maxthreads=0, SplitDir splitdir=Split_Y, size_t minitems=16384)
Definition: parallel.h:56
int size() const
How many threads are in the pool?
GLuint start
Definition: glew.h:1253
SplitDir splitdir
Definition: parallel.h:90
SplitDir
Split strategies.
Definition: parallel.h:50
GLdouble GLdouble GLdouble b
Definition: glew.h:9122
void push(std::future< void > &&f)
Definition: thread.h:823
parallel_options(string_view name, int maxthreads=0, SplitDir splitdir=Split_Y, size_t minitems=16384)
Definition: parallel.h:63
#define OIIO_NAMESPACE_END
Definition: oiioversion.h:66
string_view name
Definition: parallel.h:94
#define OIIO_NAMESPACE_BEGIN
Definition: oiioversion.h:65
#define OIIO_API
Definition: export.h:91