HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
UT_ParallelUtil.h
Go to the documentation of this file.
1 /*
2  * PROPRIETARY INFORMATION. This software is proprietary to
3  * Side Effects Software Inc., and is not to be reproduced,
4  * transmitted, or disclosed in any way without written permission.
5  *
6  * NAME: UT_ParallelUtil.h ( UT Library, C++)
7  *
8  * COMMENTS: Simple wrappers on tbb interface
9  *
10  * RELATION TO THE STL:
11  *
12  * Use UT_ParallelUtil.h (or if necessary, UT_StdThread) instead
13  * of std::thread.
14  *
15  * Reasoning:
16  *
17  * Houdini requires tight control over the number of threads as
18  * we try to follow the command line -j option.
19  * This is important for Houdini to play nicely on farms where
20  * we may get a slice of a machine.
21  * Some oversubscription is a feature, but too much is not.
22  * We use TBB currently to ensure composability of threading -
23  * your algorithm does not run in a vacuum but must thread nicely
24  * with other algorithms at the same time, so you should never
25  * assume you get # CPU threads.
26  *
27  * We also need careful control of task stealing, which requires
28  * setting up thread groups. We thus must have a centralized
29  * location where all threads are created.
30  */
31 
32 #ifndef __UT_ParallelUtil__
33 #define __UT_ParallelUtil__
34 
35 #include "UT_API.h"
36 
37 #include "UT_Array.h"
38 #include "UT_PerformanceThread.h"
39 #include "UT_TaskScope.h"
40 #include "UT_TBBParallelInvoke.h"
41 #include "UT_Thread.h"
42 #include "UT_IteratorRange.h"
43 #include "UT_Optional.h"
44 
45 #include <oneapi/tbb/blocked_range.h>
46 #include <oneapi/tbb/blocked_range2d.h>
47 #include <oneapi/tbb/parallel_for.h>
48 #include <oneapi/tbb/parallel_reduce.h>
49 #include <oneapi/tbb/parallel_sort.h>
50 #include <oneapi/tbb/task.h>
51 #include <oneapi/tbb/task_arena.h>
52 
53 /// Typedef to denote the "split" constructor of a range
55 
56 /// Declare prior to use.
57 template <typename T>
59 
60 template <typename RowT, typename ColT=RowT>
62 
63 // Default implementation that calls range.size()
64 template< typename RANGE >
66 {
68 
69  size_t operator()(const RANGE& range) const
70  {
71  return range.size();
72  }
73 };
74 
75 // Partial specialization for UT_BlockedRange2D<T>
76 template< typename T >
78 {
80 
81  size_t operator()(const UT_BlockedRange2D<T>& range) const
82  {
83  return range.rows().size() * range.cols().size();
84  }
85 };
86 
87 /// This is needed by UT_CoarsenedRange
88 template <typename RANGE>
89 inline size_t UTestimatedNumItems(const RANGE& range)
90 {
91  return UT_EstimatorNumItems<RANGE>()(range);
92 }
93 
94 /// UT_CoarsenedRange: This should be used only inside
95 /// UT_ParallelFor and UT_ParallelReduce
96 /// This class wraps an existing range with a new range.
97 /// This allows us to use simple_partitioner, rather than
98 /// auto_partitioner, which has disastrous performance with
99 /// the default grain size in ttb 4.
100 template< typename RANGE >
101 class UT_CoarsenedRange : public RANGE
102 {
103 public:
104  // Compiler-generated versions are fine:
105  // ~UT_CoarsenedRange();
106  // UT_CoarsenedRange(const UT_CoarsenedRange&);
107 
108  // Split into two sub-ranges:
110  RANGE(range, spl),
111  myGrainSize(range.myGrainSize)
112  {
113  }
114 
115  // Inherited: bool empty() const
116 
117  bool is_divisible() const
118  {
119  return
120  RANGE::is_divisible() &&
121  (UTestimatedNumItems(static_cast<const RANGE&>(*this)) > myGrainSize);
122  }
123 
124 private:
125  size_t myGrainSize;
126 
127  UT_CoarsenedRange(const RANGE& base_range, const size_t grain_size) :
128  RANGE(base_range),
129  myGrainSize(grain_size)
130  {
131  }
132 
133  template <typename Range, typename Body>
134  friend void UTparallelFor(
135  const Range &range, const Body &body,
136  const int subscribe_ratio, const int min_grain_size,
137  const bool force_use_task_scope
138  );
139  template <typename Range, typename Body>
140  friend void UTparallelReduce(
141  const Range &range, Body &body,
142  const int subscribe_ratio, const int min_grain_size,
143  const bool force_use_taskscope
144  );
145  template <typename Range, typename Body>
146  friend void UTparallelDeterministicReduce(
147  const Range &range, Body &body, const int grain_size,
148  const bool force_use_taskscope
149  );
150 };
151 
152 /// Helper class for UTparallelFor().
153 /// Wraps the thread body in a task scope so that thread stats are collected
154 /// by the performance monitor, and child tasks can inherit task scope locks
155 /// from the parent task.
156 template<typename Range, typename Body>
158 {
159 public:
160  ut_TaskScopedBody(const Body *body)
161  : myBody(body),
162  myParentTaskScope(UT_TaskScope::getCurrent())
163  {
164  }
165 
167  : myBody(src.myBody),
168  myParentTaskScope(src.myParentTaskScope)
169  {
170  }
171 
172  void operator()(const Range &r) const
173  {
174  UT_TaskScope task_scope(myParentTaskScope);
175  (*myBody)(r);
176  }
177 
178 private:
179  const Body *myBody;
180  const UT_TaskScope *myParentTaskScope;
181 };
182 
183 /// Helper class for UTparallelFor().
184 /// Wraps the thread body allowing non-copyable bodies to be used with
185 /// UTparallelFor().
186 template<typename Range, typename Body>
188 {
189 public:
190  ut_TaskBody(const Body *body) : myBody(body) {}
191  void operator()(const Range &r) const { (*myBody)(r); }
192 
193 private:
194  const Body *myBody;
195 };
196 
197 /// Helper class for UTparallelForEachNumber()
198 /// This wraps the thread body to perform different load balancing based on
199 /// peeling off tasks using an atomic int to iterate over the range.
200 /// @c IntType must be an integer type supported by @c SYS_AtomicInt (currently
201 /// int32 or int64).
202 template <typename IntType, typename Body>
204 {
205 public:
206  ut_ForEachNumberBody(const Body &body,
207  SYS_AtomicInt<IntType> &it, IntType end)
208  : myBody(body)
209  , myIt(it)
210  , myEnd(end)
211  {
212  }
214  {
215  while (true)
216  {
217  IntType it = myIt.exchangeAdd(1);
218  if (it >= myEnd)
219  break;
220  myBody(UT_BlockedRange<IntType>(it, it+1));
221  }
222  }
223 private:
224  const Body &myBody;
226  IntType myEnd;
227 };
228 
229 /// Run the @c body function over a range in parallel.
230 /// UTparallelFor attempts to spread the range out over at most
231 /// subscribe_ratio * num_processor tasks.
232 /// The factor subscribe_ratio can be used to help balance the load.
233 /// UTparallelFor() uses tbb for its implementation.
234 /// The used grain size is the maximum of min_grain_size and
235 /// if UTestimatedNumItems(range) / (subscribe_ratio * num_processor).
236 /// If subscribe_ratio == 0, then a grain size of min_grain_size will be used.
237 /// A range can be split only when UTestimatedNumItems(range) exceeds the
238 /// grain size the range is divisible.
239 
240 ///
241 /// Requirements for the Range functor are:
242 /// - the requirements of the tbb Range Concept
243 /// - UT_estimatorNumItems<Range> must return the the estimated number of work items
244 /// for the range. When Range::size() is not the correct estimate, then a
245 /// (partial) specialization of UT_estimatorNumItemsimatorRange must be provided
246 /// for the type Range.
247 ///
248 /// Requirements for the Body function are:
249 /// - @code Body(const Body &); @endcode @n
250 /// Copy Constructor
251 /// - @code Body()::~Body(); @endcode @n
252 /// Destructor
253 /// - @code void Body::operator()(const Range &range) const; @endcode
254 /// Function call to perform operation on the range. Note the operator is
255 /// @b const.
256 ///
257 /// The requirements for a Range object are:
258 /// - @code Range::Range(const Range&); @endcode @n
259 /// Copy constructor
260 /// - @code Range::~Range(); @endcode @n
261 /// Destructor
262 /// - @code bool Range::is_divisible() const; @endcode @n
263 /// True if the range can be partitioned into two sub-ranges
264 /// - @code bool Range::empty() const; @endcode @n
265 /// True if the range is empty
266 /// - @code Range::Range(Range &r, UT_Split) const; @endcode @n
267 /// Split the range @c r into two sub-ranges (i.e. modify @c r and *this)
268 ///
269 /// Example: @code
270 /// class Square
271 /// {
272 /// public:
273 /// Square(fpreal *data) : myData(data) {}
274 /// ~Square();
275 /// void operator()(const UT_BlockedRange<int64> &range) const
276 /// {
277 /// for (int64 i = range.begin(); i != range.end(); ++i)
278 /// myData[i] *= myData[i];
279 /// }
280 /// fpreal *myData;
281 /// };
282 /// ...
283 ///
284 /// void
285 /// parallel_square(fpreal *array, int64 length)
286 /// {
287 /// UTparallelFor(UT_BlockedRange<int64>(0, length), Square(array));
288 /// }
289 /// @endcode
290 ///
291 /// @see UTparallelReduce(), UT_BlockedRange()
292 
293 template <typename Range, typename Body>
295  const Range &range, const Body &body,
296  const int subscribe_ratio = 2,
297  const int min_grain_size = 1,
298  const bool force_use_task_scope = true
299 )
300 {
301  const size_t num_processors( UT_Thread::getNumProcessors() );
302 
303  UT_ASSERT( num_processors >= 1 );
304  UT_ASSERT( min_grain_size >= 1 );
305  UT_ASSERT( subscribe_ratio >= 0 );
306 
307  const size_t est_range_size( UTestimatedNumItems(range) );
308 
309  // Don't run on an empty range!
310  if (est_range_size == 0)
311  return;
312 
313  // Avoid tbb overhead if entire range needs to be single threaded
314  if (num_processors == 1 || est_range_size <= min_grain_size ||
316  {
317  body(range);
318  return;
319  }
320 
321  size_t grain_size(min_grain_size);
322  if( subscribe_ratio > 0 )
323  grain_size = std::max(
324  grain_size,
325  est_range_size / (subscribe_ratio * num_processors)
326  );
327 
328  UT_CoarsenedRange< Range > coarsened_range(range, grain_size);
329 
330  if (force_use_task_scope || UTperformanceIsRecordingThreadStats())
331  {
333  coarsened_range, ut_TaskScopedBody<Range, Body>(&body),
334  tbb::simple_partitioner());
335  }
336  else
337  {
339  coarsened_range, ut_TaskBody<Range, Body>(&body),
340  tbb::simple_partitioner());
341  }
342 }
343 
344 /// Version of UTparallelFor that always creates a task scope to prevent
345 /// deadlocking of child tasks that might acquire UT_TaskLocks.
346 template <typename Range, typename Body>
348  const Range &range, const Body &body,
349  const int subscribe_ratio = 2,
350  const int min_grain_size = 1
351 )
352 {
353  UTparallelFor(range, body, subscribe_ratio, min_grain_size, true);
354 }
355 
356 /// Version of UTparallelFor that is tuned for the case where the range
357 /// consists of lightweight items, for example,
358 /// float additions or matrix-vector multiplications.
359 template <typename Range, typename Body>
360 void
361 UTparallelForLightItems(const Range &range, const Body &body,
362  const bool force_use_task_scope = true)
363 {
364  UTparallelFor(range, body, 2, 1024, force_use_task_scope);
365 }
366 
367 /// Version of UTparallelFor that is tuned for the case where the range
368 /// consists of heavy items, for example, defragmenting an entire attribute.
369 ///
370 /// If possible, UTparallelForEachNumber() is preferred over use of
371 /// UTparallelForHeavyItems().
372 ///
373 /// Note, when the range is guaranteed to be small, you might prefer to run
374 /// <tt>UTparallelFor(range, body, 0, 1)</tt>. That form of the loop would
375 /// guarantee that a separate task is started for each iteration of the body.
376 /// However, that form can cause issues when the range gets large, in that a @b
377 /// large number of tasks may be created.
378 ///
379 template <typename Range, typename Body>
380 SYS_DEPRECATED_REPLACE(16.5, "UTparallelForEachNumber||UTparallelFor(r,b,0,1)")
381 void
382 UTparallelForHeavyItems(const Range &range, const Body &body)
383 {
384  // By oversubscribing by 32, small ranges will still be split into
385  // individual tasks. However, large ranges will be chunked, causing fewer
386  // tasks, but potentially worse load balancing.
387  //
388  // Consider using UTparallelForEachNumber() instead.
389  UTparallelFor(range, body, 32, 1, /*force_use_task=*/true);
390 }
391 
392 /// Version of UTparallelFor tuned for a range consists of heavy items, for
393 /// example, defragmenting an entire attribute.
394 ///
395 /// This approach uses "ideal" load balancing across threads and doesn't rely
396 /// on the TBB task scheduler for splitting the range. Instead, it iterates
397 /// from @c 0 to @c nitems, calling @c body with a UT_BlockedRange<IntType>
398 /// containing a list of tasks to execute.
399 ///
400 /// @note The @c IntType must work with @c SYS_AtomicInt (currently int32 or
401 /// int64). If you get a boost static assertion, please make sure the @c body
402 /// range takes the proper integer type.
403 template <typename IntType, typename Body>
404 void
405 UTparallelForEachNumber(IntType nitems, const Body &body, const bool force_use_task_scope = true)
406 {
407  const size_t num_processors(UT_Thread::getNumProcessors());
408 
409  UT_ASSERT(num_processors >= 1);
410  if (nitems == 0)
411  return;
412  if (num_processors == 1)
413  {
414  body(UT_BlockedRange<IntType>(0, nitems));
415  return;
416  }
417  if (nitems <= num_processors)
418  {
419  // When there are a small number of tasks, split into a single task per
420  // thread.
421  UTparallelFor(UT_BlockedRange<IntType>(0, nitems), body, 0, 1, force_use_task_scope);
422  return;
423  }
424 
425  // Split across number of processors, with each thread using the atomic int
426  // to query the next task to be run (similar to UT_ThreadedAlgorithm)
428  UTparallelFor(UT_BlockedRange<IntType>(0, num_processors),
429  ut_ForEachNumberBody<IntType, Body>(body, it, nitems), 0, 1, force_use_task_scope);
430 }
431 
432 /// UTserialForEachNumber can be used as a debugging tool to quickly replace a
433 /// parallel for with a serial for.
434 template <typename IntType, typename Body>
435 void
436 UTserialForEachNumber(IntType nitems, const Body &body, bool usetaskscope=true)
437 {
438  for (IntType i = 0; i < nitems; ++i)
439  body(UT_BlockedRange<IntType>(i, i + 1));
440 }
441 
442 /// Version of UTparallelForEachNumber that wraps the body in a UT_TaskScope
443 /// that makes it safe to use UT_TaskLock objects that are currently locked by
444 /// the parent scope.
445 template <typename IntType, typename Body>
446 void
447 UTparallelForEachNumberTaskScope(IntType nitems, const Body &body)
448 {
449  UTparallelForEachNumber(nitems, body, /*force_use_task_scope=*/true);
450 }
451 
452 /// UTserialFor can be used as a debugging tool to quickly replace a parallel
453 /// for with a serial for.
454 template <typename Range, typename Body>
455 void UTserialFor(const Range &range, const Body &body)
456  { body(range); }
457 
458 /// Helper class for UTparallelInvoke().
459 /// Wraps the thread body in a task scope so that thread stats are collected
460 /// by the performance monitor, and child tasks can inherit task scope locks
461 /// from the parent task.
462 template<typename Body>
464 {
465 public:
466  ut_TaskScopedInvokeBody(const Body &body)
467  : myBody(body),
468  myParentTaskScope(UT_TaskScope::getCurrent())
469  {
470  }
471 
473  : myBody(src.myBody),
474  myParentTaskScope(src.myParentTaskScope)
475  {
476  }
477 
478  void operator()() const
479  {
480  UT_TaskScope task_scope(myParentTaskScope);
481  myBody();
482  }
483 
484 private:
485  const Body &myBody;
486  const UT_TaskScope *myParentTaskScope;
487 };
488 
489 /// Takes a functor for passing to UTparallelInvoke, and wraps it in a
490 /// ut_TaskScopeInvokeBody object so the functor will be invoked wrapped in
491 /// a UT_TaskScope that makes it safe to use UT_TaskLock objects that are
492 /// currently locked by the parent scope.
493 template <typename Body>
495 UTmakeTaskScopedInvokeBody(const Body &body)
496 {
497  return ut_TaskScopedInvokeBody<Body>(body);
498 }
499 
500 /// UTparallelInvoke() executes the given functions in parallel when the
501 /// parallel flag is true - otherwise it runs them serially. F1 and F2
502 /// should be void functors.
503 template <typename F1, typename F2>
504 inline void UTparallelInvoke(bool parallel, F1 &&f1, F2 &&f2)
505 {
506  if (parallel && UT_Thread::isThreadingEnabled())
507  {
508  tbb::parallel_invoke(UTmakeTaskScopedInvokeBody(std::forward<F1>(f1)),
509  UTmakeTaskScopedInvokeBody(std::forward<F2>(f2)));
510  }
511  else
512  {
513  f1();
514  f2();
515  }
516 }
517 
518 template <typename F1, typename F2, typename... Rest>
519 inline void UTparallelInvoke(bool parallel, F1 &&f1, F2 &&f2, Rest&&... rest)
520 {
521  if (parallel && UT_Thread::isThreadingEnabled())
522  {
523  tbb::parallel_invoke(UTmakeTaskScopedInvokeBody(std::forward<F1>(f1)),
524  UTmakeTaskScopedInvokeBody(std::forward<F2>(f2)),
525  UTmakeTaskScopedInvokeBody(std::forward<Rest>(rest))...);
526  }
527  else
528  {
529  f1();
530  UTparallelInvoke(parallel, f2, std::forward<Rest>(rest)...);
531  }
532 }
533 
534 template <typename F1>
536 {
537 public:
539  : myFunctions(functions) {}
540  void operator()(const tbb::blocked_range<int>& r ) const
541  {
542  for (int i = r.begin(); i != r.end(); ++i)
543  (*myFunctions(i))();
544  }
545 private:
546  const UT_Array<F1 *> &myFunctions;
547 };
548 
549 /// UTparallelInvoke() executes the array of functions in parallel when the
550 /// parallel flag is true - otherwise it runs them serially. F1 should be
551 /// a void functor.
552 template <typename F1>
553 inline void UTparallelInvoke(bool parallel, const UT_Array<F1 *> &funs)
554 {
555  if (parallel && funs.entries() > 1 && UT_Thread::isThreadingEnabled())
556  {
557  UTparallelFor(tbb::blocked_range<int>(0, funs.entries(), 1),
559  32, 1); // oversubscribe to force forking
560  }
561  else
562  {
563  for (int i = 0; i < funs.entries(); i++)
564  (*funs(i))();
565  }
566 }
567 
568 template <typename F1>
570 {
571 public:
573  : myFunctions(functions) {}
574  void operator()(const tbb::blocked_range<int>& r ) const
575  {
576  for (int i = r.begin(); i != r.end(); ++i)
577  myFunctions(i)();
578  }
579 private:
580  const UT_Array<F1> &myFunctions;
581 };
582 
583 /// UTparallelInvoke() executes the array of functions in parallel when the
584 /// parallel flag is true - otherwise it runs them serially. F1 should be
585 /// a void functor.
586 template <typename F1>
587 inline void UTparallelInvoke(bool parallel, const UT_Array<F1> &funs)
588 {
589  if (parallel && funs.entries() > 1 && UT_Thread::isThreadingEnabled())
590  {
591  UTparallelFor(tbb::blocked_range<int>(0, funs.entries(), 1),
593  32, 1); // oversubscribe to force forking
594  }
595  else
596  {
597  for (int i = 0; i < funs.entries(); i++)
598  funs(i)();
599  }
600 }
601 
602 /// Helper class for UTparallelReduce().
603 /// Wraps the thread body in a task scope so that thread stats are collected
604 /// by the performance monitor, and child tasks can inherit task scope locks
605 /// from the parent task.
606 template<typename Range, typename Body>
608 {
609 public:
610  // Construct from base type pointer, holds a pointer to it.
612  : myParentTaskScope(UT_TaskScope::getCurrent())
613  {
614  myBodyPtr = body;
615  }
616 
618  : myParentTaskScope(src.myParentTaskScope)
619  , myBodyPtr(nullptr)
620  {
621  UT_TaskScope task_scope(myParentTaskScope);
622  myBody.emplace(src.body(), UT_Split());
623  }
624 
625  void operator()(const Range &r)
626  {
627  UT_TaskScope task_scope(myParentTaskScope);
628  body()(r);
629  }
630 
632  {
633  UT_TaskScope task_scope(myParentTaskScope);
634  body().join(other.body());
635  }
636 
637  const Body &body() const { return myBodyPtr ? *myBodyPtr : *myBody; }
638  Body &body() { return myBodyPtr ? *myBodyPtr : *myBody; }
639 private:
640  UT_Optional<Body> myBody;
641  Body *myBodyPtr;
642  const UT_TaskScope *myParentTaskScope;
643 };
644 
645 /// UTparallelReduce() is a simple wrapper that uses tbb for its implementation.
646 /// Run the @c body function over a range in parallel.
647 ///
648 /// WARNING: The @c operator()() and @c join() functions MUST @b NOT initialize
649 /// data! @b Both of these functions MUST ONLY accumulate data! This
650 /// is because TBB may re-use body objects for multiple ranges.
651 /// Effectively, operator()() must act as an in-place join operation
652 /// for data as it comes in. Initialization must be kept to the
653 /// constructors of Body.
654 ///
655 /// Requirements for the Body function are:
656 /// - @code Body()::~Body(); @endcode @n
657 /// Destructor
658 /// - @code Body::Body(Body &r, UT_Split) const; @endcode @n
659 /// The splitting constructor.
660 /// WARNING: This must be able to run concurrently with calls to
661 /// @c r.operator()() and @c r.join(), so this should not copy
662 /// values accumulating in r.
663 /// - @code void Body::operator()(const Range &range); @endcode
664 /// Function call to perform operation on the range. Note the operator is
665 /// @b not const.
666 /// - @code void Body::join(const Body &other); @endcode
667 /// Join the results from another operation with this operation.
668 /// @b not const.
669 ///
670 /// The requirements for a Range object are:
671 /// - @code Range::Range(const Range&); @endcode @n
672 /// Copy constructor
673 /// - @code Range::~Range(); @endcode @n
674 /// Destructor
675 /// - @code bool Range::is_divisible() const; @endcode @n
676 /// True if the range can be partitioned into two sub-ranges
677 /// - @code bool Range::empty() const; @endcode @n
678 /// True if the range is empty
679 /// - @code Range::Range(Range &r, UT_Split) const; @endcode @n
680 /// Split the range @c r into two sub-ranges (i.e. modify @c r and *this)
681 ///
682 /// Example: @code
683 /// class Dot
684 /// {
685 /// public:
686 /// Dot(const fpreal *a, const fpreal *b)
687 /// : myA(a)
688 /// , myB(b)
689 /// , mySum(0)
690 /// {}
691 /// Dot(Dot &src, UT_Split)
692 /// : myA(src.myA)
693 /// , myB(src.myB)
694 /// , mySum(0)
695 /// {}
696 /// void operator()(const UT_BlockedRange<int64> &range)
697 /// {
698 /// for (int64 i = range.begin(); i != range.end(); ++i)
699 /// mySum += myA[i] * myB[i];
700 /// }
701 /// void join(const Dot &other)
702 /// {
703 /// mySum += other.mySum;
704 /// }
705 /// fpreal mySum;
706 /// const fpreal *myA, *myB;
707 /// };
708 ///
709 /// fpreal
710 /// parallel_dot(const fpreal *a, const fpreal *b, int64 length)
711 /// {
712 /// Dot body(a, b);
713 /// UTparallelReduce(UT_BlockedRange<int64>(0, length), body);
714 /// return body.mySum;
715 /// }
716 /// @endcode
717 /// @see UTparallelFor(), UT_BlockedRange()
718 template <typename Range, typename Body>
720  const Range &range,
721  Body &body,
722  const int subscribe_ratio = 2,
723  const int min_grain_size = 1,
724  const bool force_use_task_scope = true
725 )
726 {
727  const size_t num_processors( UT_Thread::getNumProcessors() );
728 
729  UT_ASSERT( num_processors >= 1 );
730  UT_ASSERT( min_grain_size >= 1 );
731  UT_ASSERT( subscribe_ratio >= 0 );
732 
733  const size_t est_range_size( UTestimatedNumItems(range) );
734 
735  // Don't run on an empty range!
736  if (est_range_size == 0)
737  return;
738 
739  // Avoid tbb overhead if entire range needs to be single threaded
740  if (num_processors == 1 || est_range_size <= min_grain_size ||
742  {
743  body(range);
744  return;
745  }
746 
747  size_t grain_size(min_grain_size);
748  if( subscribe_ratio > 0 )
749  grain_size = std::max(
750  grain_size,
751  est_range_size / (subscribe_ratio * num_processors)
752  );
753 
754  UT_CoarsenedRange< Range > coarsened_range(range, grain_size);
755  if (force_use_task_scope || UTperformanceIsRecordingThreadStats())
756  {
757  ut_ReduceTaskScopedBody<Range, Body> bodywrapper(&body);
758  tbb::parallel_reduce(coarsened_range,
759  bodywrapper,
760  tbb::simple_partitioner());
761  }
762  else
763  {
764  tbb::parallel_reduce(coarsened_range, body, tbb::simple_partitioner());
765  }
766 }
767 
768 /// This is a simple wrapper for deterministic reduce that uses tbb. It
769 /// works in the same manner as UTparallelReduce, with the following
770 /// differences:
771 /// - reduction and join order is deterministic (devoid of threading
772 /// uncertainty;
773 /// - a fixed grain size must be provided by the caller; grain size is
774 /// not adjusted based on the available resources (this is required to
775 /// satisfy determinism).
776 /// This version should be used when task joining is not associative (such
777 /// as accumulation of a floating point residual).
778 template <typename Range, typename Body>
780  const Range &range,
781  Body &body,
782  const int grain_size,
783  const bool force_use_task_scope = true
784 )
785 {
786  UT_ASSERT( grain_size >= 1 );
787 
788  const size_t est_range_size( UTestimatedNumItems(range) );
789 
790  // Don't run on an empty range!
791  if (est_range_size == 0)
792  return;
793 
795  "FIXME: There needs to be a way to do identical splits and joins when single-threading,"
796  " to avoid having different roundoff error from when multi-threading. "
797  " Something using simple_partitioner() might work.");
798 
799  UT_CoarsenedRange< Range > coarsened_range(range, grain_size);
800  if (force_use_task_scope || UTperformanceIsRecordingThreadStats())
801  {
802  ut_ReduceTaskScopedBody<Range, Body> bodywrapper(&body);
803  tbb::parallel_deterministic_reduce(coarsened_range,
804  bodywrapper,
805  tbb::simple_partitioner());
806  }
807  else
808  {
809  tbb::parallel_deterministic_reduce(coarsened_range, body);
810  }
811 }
812 
813 /// Version of UTparallelReduce that is tuned for the case where the range
814 /// consists of lightweight items, for example, finding the min/max in a set of
815 /// integers.
816 template <typename Range, typename Body>
817 void UTparallelReduceLightItems(const Range &range, Body &body)
818 {
819  UTparallelReduce(range, body, 2, 1024);
820 }
821 
822 /// Version of UTparallelReduce that is tuned for the case where the range
823 /// consists of heavy items, for example, computing the bounding box of a list
824 /// of geometry objects.
825 template <typename Range, typename Body>
826 void UTparallelReduceHeavyItems(const Range &range, Body &body)
827 {
828  UTparallelReduce(range, body, 0, 1);
829 }
830 
831 /// UTserialReduce can be used as a debugging tool to quickly replace a
832 /// parallel reduce with a serial for.
833 template <typename Range, typename Body>
834 void UTserialReduce(const Range &range, Body &body)
835  { body(range); }
836 
837 /// Cancel the entire current task group context when run within a task
838 static inline void
839 UTparallelCancelGroupExecution()
840 {
841  tbb::task::current_context()->cancel_group_execution();
842 }
843 
844 /// UTparallelSort() is a simple wrapper that uses tbb for its implementation.
845 ///
846 /// WARNING: UTparallelSort is UNSTABLE! You must explicitly force stability
847 /// if needed.
848 template <typename RandomAccessIterator, typename Compare>
849 void UTparallelSort(RandomAccessIterator begin, RandomAccessIterator end, const Compare &compare)
850 {
852  tbb::parallel_sort(begin, end, compare);
853  else
854  std::sort(begin, end, compare);
855 }
856 
857 /// UTparallelSort() is a simple wrapper that uses tbb for its implementation.
858 ///
859 /// WARNING: UTparallelSort is UNSTABLE! You must explicitly force stability
860 /// if needed.
861 template <typename RandomAccessIterator>
862 void UTparallelSort(RandomAccessIterator begin, RandomAccessIterator end)
863 {
865  tbb::parallel_sort(begin, end);
866  else
867  std::sort(begin, end);
868 }
869 
870 /// UTparallelSort() is a simple wrapper that uses tbb for its implementation.
871 ///
872 /// WARNING: UTparallelSort is UNSTABLE! You must explicitly force stability
873 /// if needed.
874 template <typename T>
876 {
878  tbb::parallel_sort(begin, end);
879  else
880  std::sort(begin, end);
881 }
882 
883 // Forward declaration of parallel_stable_sort; implementation at end of file.
884 namespace pss
885 {
886 template<typename RandomAccessIterator, typename Compare>
887 void parallel_stable_sort( RandomAccessIterator xs, RandomAccessIterator xe,
888  Compare comp );
889 
890 //! Wrapper for sorting with default comparator.
891 template<class RandomAccessIterator>
892 void parallel_stable_sort( RandomAccessIterator xs, RandomAccessIterator xe )
893 {
895  parallel_stable_sort( xs, xe, std::less<T>() );
896 }
897 }
898 
899 /// UTparalleStableSort() is a stable parallel merge sort.
900 ///
901 /// NOTE: UTparallelStableSort requires a temporary buffer of size end-begin.
902 /// On allocation failure it falls back to calling @c std::stable_sort.
903 /// NOTE: Element initialization is done via @c std::move, so non-POD element
904 /// types should implement c++11 move semantics.
905 template <typename RandomAccessIterator, typename Compare>
906 void UTparallelStableSort(RandomAccessIterator begin, RandomAccessIterator end,
907  const Compare &compare)
908 {
909  pss::parallel_stable_sort(begin, end, compare);
910 }
911 
912 /// UTparalleStableSort() is a stable parallel merge sort.
913 ///
914 /// NOTE: UTparallelStableSort requires a temporary buffer of size end-begin.
915 /// On allocation failure it falls back to calling @c std::stable_sort.
916 /// NOTE: Element initialization is done via @c std::move, so non-POD element
917 /// types should implement c++11 move semantics.
918 template <typename RandomAccessIterator>
919 void UTparallelStableSort(RandomAccessIterator begin, RandomAccessIterator end)
920 {
921  pss::parallel_stable_sort(begin, end);
922 }
923 
924 /// UTparalleStableSort() is a stable parallel merge sort.
925 ///
926 /// NOTE: UTparallelStableSort requires a temporary buffer of size end-begin.
927 /// On allocation failure it falls back to calling @c std::stable_sort.
928 /// NOTE: Element initialization is done via @c std::move, so non-POD element
929 /// types should implement c++11 move semantics.
930 template <typename T>
932 {
933  pss::parallel_stable_sort(begin, end);
934 }
935 
936 /// UTparalleStableSort() is a stable parallel merge sort.
937 ///
938 /// NOTE: UTparallelStableSort requires a temporary buffer of size end-begin.
939 /// On allocation failure it falls back to calling @c std::stable_sort.
940 /// NOTE: Element initialization is done via @c std::move, so non-POD element
941 /// types should implement c++11 move semantics.
942 template <typename T, typename Compare>
943 void UTparallelStableSort(T *begin, T *end, const Compare &compare)
944 {
945  pss::parallel_stable_sort(begin, end, compare);
946 }
947 
948 
949 /// UTparalleStableSort() is a stable parallel merge sort.
950 /// This form works with UT_Array and other containers with begin/end members.
951 ///
952 /// NOTE: UTparallelStableSort requires a temporary buffer of size end-begin.
953 /// On allocation failure it falls back to calling @c std::stable_sort.
954 /// NOTE: Element initialization is done via @c std::move, so non-POD element
955 /// types should implement c++11 move semantics.
956 template <typename T>
957 void
959 {
960  pss::parallel_stable_sort(a.begin(), a.end());
961 }
962 
963 
964 /// UTparalleStableSort() is a stable parallel merge sort.
965 /// This form works with UT_Array and other containers with begin/end members.
966 ///
967 /// NOTE: UTparallelStableSort requires a temporary buffer of size end-begin.
968 /// On allocation failure it falls back to calling @c std::stable_sort.
969 /// NOTE: Element initialization is done via @c std::move, so non-POD element
970 /// types should implement c++11 move semantics.
971 template <typename T, typename Compare>
972 void
973 UTparallelStableSort(T &a, const Compare &compare)
974 {
975  pss::parallel_stable_sort(a.begin(), a.end(), compare);
976 }
977 
978 /// UT_BlockedRange() is a simple wrapper using tbb for its implementation
979 /// This meets the requirements for a Range object, which are:
980 /// - @code Range::Range(const Range&); @endcode @n
981 /// Copy constructor
982 /// - @code Range::~Range(); @endcode @n
983 /// Destructor
984 /// - @code bool Range::is_divisible() const; @endcode @n
985 /// True if the range can be partitioned into two sub-ranges
986 /// - @code bool Range::empty() const; @endcode @n
987 /// True if the range is empty
988 /// - @code Range::Range(Range &r, UT_Split) const; @endcode @n
989 /// Split the range @c r into two sub-ranges (i.e. modify @c r and *this)
990 template <typename T>
991 class UT_BlockedRange : public tbb::blocked_range<T>
992 {
993 public:
994  // TBB 2018 U3 no longer supports default blocked_range constructors
995  UT_BlockedRange() = delete;
996 
997  UT_BlockedRange(T begin_value, T end_value, size_t grainsize=1)
998  : tbb::blocked_range<T>(begin_value, end_value, grainsize)
999  {}
1001  : tbb::blocked_range<T>(R, split)
1002  {}
1003 
1004 
1005  // Because the VALUE of a blocked range may be a simple
1006  // type like int, the range-based for will fail to do a
1007  // dereference on it. This iterator-like wrapper will
1008  // allow * to work.
1010  {
1011  public:
1013  explicit ValueWrapper(const T &it)
1014  : myCurrent(it)
1015  {}
1016 
1018  T operator*() { return myCurrent; }
1019 
1021  bool operator==(const ValueWrapper &cmp) const
1022  { return (myCurrent == cmp.myCurrent); }
1024  bool operator!=(const ValueWrapper &cmp) const
1025  { return !(*this == cmp); }
1026 
1029  {
1030  ++myCurrent;
1031  return *this;
1032  }
1033  private:
1034  T myCurrent;
1035  };
1036 
1037  // Allows for:
1038  // for (T value : range.items())
1039  auto items() const
1040  {
1041  return UT_IteratorRange<ValueWrapper>(ValueWrapper(this->begin()), ValueWrapper(this->end()));
1042  }
1043 
1044 };
1045 
1046 /// UT_BlockedRange2D() is a simple wrapper using tbb for its implementation
1047 /// This meets the requirements for a Range object, which are:
1048 /// - @code Range::Range(const Range&); @endcode @n
1049 /// Copy constructor
1050 /// - @code Range::~Range(); @endcode @n
1051 /// Destructor
1052 /// - @code bool Range::is_divisible() const; @endcode @n
1053 /// True if the range can be partitioned into two sub-ranges
1054 /// - @code bool Range::empty() const; @endcode @n
1055 /// True if the range is empty
1056 /// - @code Range::Range(Range &r, UT_Split) const; @endcode @n
1057 /// Split the range @c r into two sub-ranges (i.e. modify @c r and *this)
1058 template <typename RowT, typename ColT>
1059 class UT_BlockedRange2D : public tbb::blocked_range2d<RowT, ColT>
1060 {
1061 public:
1062  // TBB 2018 U3 no longer supports default blocked_range constructors
1063  UT_BlockedRange2D() = delete;
1064 
1065  /// NB: The arguments are in a different order than tbb
1066  UT_BlockedRange2D(RowT row_begin, RowT row_end,
1067  ColT col_begin, ColT col_end,
1068  size_t row_grainsize=1, size_t col_grainsize=1)
1069  : tbb::blocked_range2d<RowT, ColT>(row_begin, row_end, row_grainsize,
1070  col_begin, col_end, col_grainsize)
1071  {}
1073  : tbb::blocked_range2d<RowT, ColT>(R, split)
1074  {}
1075 };
1076 
1077 /// Performs a prefix sum across all the entries of the array.
1078 /// Ie,
1079 /// for (int i = 1; i < array.entries(); i++)
1080 /// array(i) = OP(array(i-1), array(i));
1081 /// tbb has this as tbb_parallel_scan but does not guarantee determinism.
1082 /// Note determinism is based on grain size, so that must be fixed.
1083 template <typename Op, typename T>
1084 void
1086  UT_Array<T> &array,
1087  const T identity,
1088  const Op &op,
1089  const int grain_size = 1024,
1090  const bool force_use_task_scope = true
1091 )
1092 {
1093  // Check serial. We need to have a enough grains to make
1094  // this worthwhile.
1095  if (array.entries() < grain_size * 10)
1096  {
1097  T total = identity;
1098  for (exint i = 0, n = array.entries(); i < n; i++)
1099  {
1100  total = op(total, array(i));
1101  array(i) = total;
1102  }
1103  return;
1104  }
1105 
1106  // We could use the actual destination array to store the block
1107  // totals with some cleverness... For example, perhaps a stride &
1108  // offset so we could still recurse on prefix summing those totals?
1109  UT_Array<T> blocktotals;
1110  exint nblocks = (array.entries() + grain_size-1) / grain_size;
1111  blocktotals.setSizeNoInit(nblocks);
1112 
1113  // Scan for total for each block & compute the prefix sum
1114  // within the block
1115  UTparallelForEachNumber(nblocks, [&](const UT_BlockedRange<exint> &r)
1116  {
1117  for (exint block = r.begin(); block < r.end(); block++)
1118  {
1119  exint start = block * grain_size;
1120  exint end = SYSmin((block+1)*grain_size, array.entries());
1121  T total = identity;
1122  for (exint i = start; i < end; i++)
1123  {
1124  total = op(total, array(i));
1125  array(i) = total;
1126  }
1127  // TODO: False sharing here?
1128  blocktotals(block) = total;
1129  }
1130  }, force_use_task_scope);
1131 
1132  // Prefix sum our block totals.
1134  identity, op,
1135  grain_size, force_use_task_scope);
1136 
1137  // Apply them back...
1138  UTparallelForEachNumber(nblocks, [&](const UT_BlockedRange<exint> &r)
1139  {
1140  for (exint block = r.begin(); block < r.end(); block++)
1141  {
1142  exint start = block * grain_size;
1143  exint end = SYSmin((block+1)*grain_size, array.entries());
1144  if (block > 0)
1145  {
1146  T total = blocktotals(block-1);
1147  for (exint i = start; i < end; i++)
1148  {
1149  array(i) = op(total, array(i));
1150  }
1151  }
1152  }
1153  }, force_use_task_scope);
1154 }
1155 
1156 
1157 /// @{
1158 /// Wrapper around TBB's task isolation. In versions of TBB that don't support
1159 /// isolate, this uses a task arena.
1160 #if TBB_VERSION_MAJOR >= 2018
1161 template <typename F> static inline void
1162 UTisolate(F &f) { tbb::this_task_arena::isolate(f); }
1163 
1164 template <typename F> static inline void
1165 UTisolate(const F &f) { tbb::this_task_arena::isolate(f); }
1166 #else
1167 template <typename F> static inline void
1168 UTisolate(F &f)
1169 {
1170  tbb::task_arena __nested;
1171  __nested.execute(f);
1172 }
1173 template <typename F> static inline void
1174 UTisolate(const F &f)
1175 {
1176  tbb::task_arena __nested;
1177  __nested.execute(f);
1178 }
1179 #endif
1180 /// @}
1181 
1182 // The code below is originally from:
1183 // https://software.intel.com/en-us/articles/a-parallel-stable-sort-using-c11-for-tbb-cilk-plus-and-openmp
1184 // and is covered by the following copyright:
1185 /*
1186  Copyright (C) 2014 Intel Corporation
1187  All rights reserved.
1188 
1189  Redistribution and use in source and binary forms, with or without
1190  modification, are permitted provided that the following conditions
1191  are met:
1192 
1193  * Redistributions of source code must retain the above copyright
1194  notice, this list of conditions and the following disclaimer.
1195  * Redistributions in binary form must reproduce the above copyright
1196  notice, this list of conditions and the following disclaimer in
1197  the documentation and/or other materials provided with the
1198  distribution.
1199  * Neither the name of Intel Corporation nor the names of its
1200  contributors may be used to endorse or promote products derived
1201  from this software without specific prior written permission.
1202 
1203  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
1204  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
1205  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
1206  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
1207  HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
1208  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
1209  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
1210  OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
1211  AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
1212  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
1213  WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
1214  POSSIBILITY OF SUCH DAMAGE.
1215 */
1216 #include <utility>
1217 #include <iterator>
1218 #include <algorithm>
1219 
1220 namespace pss {
1221 
1222 namespace internal {
1223 
1224 //! Destroy sequence [xs,xe)
1225 template<class RandomAccessIterator>
1226 void serial_destroy( RandomAccessIterator zs, RandomAccessIterator ze ) {
1228  while( zs!=ze ) {
1229  --ze;
1230  (*ze).~T();
1231  }
1232 }
1233 
1234 //! Merge sequences [xs,xe) and [ys,ye) to output sequence [zs,(xe-xs)+(ye-ys)), using std::move
1235 template<class RandomAccessIterator1, class RandomAccessIterator2, class RandomAccessIterator3, class Compare>
1236 void serial_move_merge( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, Compare comp ) {
1237  if( xs!=xe ) {
1238  if( ys!=ye )
1239  {
1240  for(;;)
1241  {
1242  if( comp(*ys,*xs) ) {
1243  *zs = std::move(*ys);
1244  ++zs;
1245  if( ++ys==ye ) break;
1246  } else {
1247  *zs = std::move(*xs);
1248  ++zs;
1249  if( ++xs==xe ) goto movey;
1250  }
1251  }
1252  }
1253  ys = xs;
1254  ye = xe;
1255  }
1256 movey:
1257  std::move( ys, ye, zs );
1258 }
1259 
1260 template<typename RandomAccessIterator1, typename RandomAccessIterator2, typename Compare>
1261 void stable_sort_base_case( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp) {
1262  std::stable_sort( xs, xe, comp );
1263  if( inplace!=2 ) {
1264  RandomAccessIterator2 ze = zs + (xe-xs);
1266  if( inplace )
1267  // Initialize the temporary buffer
1268  for( ; zs<ze; ++zs )
1269  new(&*zs) T;
1270  else
1271  // Initialize the temporary buffer and move keys to it.
1272  for( ; zs<ze; ++xs, ++zs )
1273  new(&*zs) T(std::move(*xs));
1274  }
1275 }
1276 
1277 //! Raw memory buffer with automatic cleanup.
1279 {
1280  void* ptr;
1281 public:
1282  //! Try to obtain buffer of given size.
1283  raw_buffer( size_t bytes ) : ptr( operator new(bytes,std::nothrow) ) {}
1284  //! True if buffer was successfully obtained, zero otherwise.
1285  operator bool() const {return ptr;}
1286  //! Return pointer to buffer, or NULL if buffer could not be obtained.
1287  void* get() const {return ptr;}
1288  //! Destroy buffer
1289  ~raw_buffer() {operator delete(ptr);}
1290 };
1291 
1292 template<typename RandomAccessIterator1, typename RandomAccessIterator2, typename RandomAccessIterator3, typename Compare>
1293 void parallel_merge( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys,
1294  RandomAccessIterator2 ye, RandomAccessIterator3 zs, bool destroy, Compare comp );
1295 
1296 template<typename RandomAccessIterator1, typename RandomAccessIterator2, typename RandomAccessIterator3, typename Compare>
1298 {
1299  RandomAccessIterator1 _xs, _xe;
1300  RandomAccessIterator2 _ys, _ye;
1301  RandomAccessIterator3 _zs;
1302  bool _destroy;
1303  Compare _comp;
1304  parallel_merge_invoke( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye,
1305  RandomAccessIterator3 zs, bool destroy, Compare comp):
1306  _xs(xs), _xe(xe), _ys(ys), _ye(ye), _zs(zs), _destroy(destroy), _comp(comp) {}
1307 
1309 
1310 };
1311 
1312 // Merge sequences [xs,xe) and [ys,ye) to output sequence [zs,zs+(xe-xs)+(ye-ys))
1313 template<typename RandomAccessIterator1, typename RandomAccessIterator2, typename RandomAccessIterator3, typename Compare>
1314 void parallel_merge( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys,
1315  RandomAccessIterator2 ye, RandomAccessIterator3 zs, bool destroy, Compare comp ) {
1316  const size_t MERGE_CUT_OFF = 2000;
1317  if( (xe-xs) + (ye-ys) <= MERGE_CUT_OFF ) {
1318  serial_move_merge( xs, xe, ys, ye, zs, comp );
1319  if( destroy ) {
1320  serial_destroy( xs, xe );
1321  serial_destroy( ys, ye );
1322  }
1323  } else {
1324  RandomAccessIterator1 xm;
1325  RandomAccessIterator2 ym;
1326  if( xe-xs < ye-ys ) {
1327  ym = ys+(ye-ys)/2;
1328  xm = std::upper_bound(xs,xe,*ym,comp);
1329  } else {
1330  xm = xs+(xe-xs)/2;
1331  ym = std::lower_bound(ys,ye,*xm,comp);
1332  }
1333  RandomAccessIterator3 zm = zs + ((xm-xs) + (ym-ys));
1334  tbb::parallel_invoke( parallel_merge_invoke<RandomAccessIterator1, RandomAccessIterator2, RandomAccessIterator3, Compare>( xs, xm, ys, ym, zs, destroy, comp ),
1336  }
1337 }
1338 
1339 template<typename RandomAccessIterator1, typename RandomAccessIterator2, typename Compare>
1340 void parallel_stable_sort_aux( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp );
1341 
1342 template<typename RandomAccessIterator1, typename RandomAccessIterator2, typename Compare>
1344 {
1345  RandomAccessIterator1 _xs, _xe;
1346  RandomAccessIterator2 _zs;
1347  bool _inplace;
1348  Compare _comp;
1349  parallel_stable_sort_aux_invoke( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp ):
1350  _xs(xs), _xe(xe), _zs(zs), _inplace(inplace), _comp(comp) {}
1351 
1353 
1354 };
1355 
1356 // Sorts [xs,xe), where zs[0:xe-xs) is temporary buffer supplied by caller.
1357 // Result is in [xs,xe) if inplace==true, otherwise in [zs,zs+(xe-xs))
1358 template<typename RandomAccessIterator1, typename RandomAccessIterator2, typename Compare>
1359 void parallel_stable_sort_aux( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp ) {
1360  const size_t SORT_CUT_OFF = 500;
1361  if( xe-xs<=SORT_CUT_OFF ) {
1362  stable_sort_base_case(xs, xe, zs, inplace, comp);
1363  } else {
1364  RandomAccessIterator1 xm = xs + (xe-xs)/2;
1365  RandomAccessIterator2 zm = zs + (xm-xs);
1366  RandomAccessIterator2 ze = zs + (xe-xs);
1367  tbb::parallel_invoke( parallel_stable_sort_aux_invoke<RandomAccessIterator1, RandomAccessIterator2, Compare>( xs, xm, zs, !inplace, comp ),
1369  if( inplace )
1370  parallel_merge( zs, zm, zm, ze, xs, inplace==2, comp );
1371  else
1372  parallel_merge( xs, xm, xm, xe, zs, false, comp );
1373  }
1374 }
1375 } // namespace internal
1376 
1377 template<typename RandomAccessIterator, typename Compare>
1378 void parallel_stable_sort( RandomAccessIterator xs, RandomAccessIterator xe, Compare comp ) {
1380  internal::raw_buffer z = internal::raw_buffer( sizeof(T)*(xe-xs) );
1381  if( z && UT_Thread::isThreadingEnabled() )
1382  internal::parallel_stable_sort_aux( xs, xe, (T*)z.get(), 2, comp );
1383  else
1384  // Not enough memory available - fall back on serial sort
1385  std::stable_sort( xs, xe, comp );
1386 }
1387 
1388 } // namespace pss
1389 
1390 
1391 #endif
ut_TaskScopedInvokeBody(const Body &body)
void UTparallelSort(RandomAccessIterator begin, RandomAccessIterator end, const Compare &compare)
UT_BlockedRange2D()=delete
SYS_FORCE_INLINE bool operator==(const ValueWrapper &cmp) const
UT_BlockedRange(T begin_value, T end_value, size_t grainsize=1)
SYS_FORCE_INLINE ValueWrapper & operator++()
void UTparallelFor(const Range &range, const Body &body, const int subscribe_ratio=2, const int min_grain_size=1, const bool force_use_task_scope=true)
void UTparallelDeterministicReduce(const Range &range, Body &body, const int grain_size, const bool force_use_task_scope=true)
size_t operator()(const RANGE &range) const
GLenum GLint * range
Definition: glcorearb.h:1925
tbb::split UT_Split
Definition: GA_PolyCounts.h:25
void UTparallelForTaskScope(const Range &range, const Body &body, const int subscribe_ratio=2, const int min_grain_size=1)
SYS_FORCE_INLINE bool operator!=(const ValueWrapper &cmp) const
friend void UTparallelDeterministicReduce(const Range &range, Body &body, const int grain_size, const bool force_use_taskscope)
void operator()(const Range &r)
void
Definition: png.h:1083
void UTparallelForEachNumber(IntType nitems, const Body &body, const bool force_use_task_scope=true)
void UTparallelDeterministicPrefixSumInPlace(UT_Array< T > &array, const T identity, const Op &op, const int grain_size=1024, const bool force_use_task_scope=true)
GLuint start
Definition: glcorearb.h:475
void setSizeNoInit(exint newsize)
Definition: UT_Array.h:702
void UTserialReduce(const Range &range, Body &body)
ut_ReduceTaskScopedBody(Body *body)
CompareResults OIIO_API compare(const ImageBuf &A, const ImageBuf &B, float failthresh, float warnthresh, float failrelative, float warnrelative, ROI roi={}, int nthreads=0)
GLdouble GLdouble GLdouble z
Definition: glcorearb.h:848
int64 exint
Definition: SYS_Types.h:125
GLboolean GLboolean GLboolean GLboolean a
Definition: glcorearb.h:1222
void serial_destroy(RandomAccessIterator zs, RandomAccessIterator ze)
Destroy sequence [xs,xe)
PUGI__FN void sort(I begin, I end, const Pred &pred)
Definition: pugixml.cpp:7550
void UTparallelForLightItems(const Range &range, const Body &body, const bool force_use_task_scope=true)
void UTserialForEachNumber(IntType nitems, const Body &body, bool usetaskscope=true)
T exchangeAdd(T val)
uint64 value_type
Definition: GA_PrimCompat.h:29
void parallel_stable_sort_aux(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp)
static bool isThreadingEnabled()
OutGridT const XformOp bool bool
ut_ReduceTaskScopedBody(ut_ReduceTaskScopedBody &src, UT_Split)
std::optional< T > UT_Optional
Definition: UT_Optional.h:26
size_t UTestimatedNumItems(const RANGE &range)
This is needed by UT_CoarsenedRange.
IMATH_HOSTDEVICE constexpr int cmp(T a, T b) IMATH_NOEXCEPT
Definition: ImathFun.h:84
size_t operator()(const UT_BlockedRange2D< T > &range) const
#define UT_ASSERT_MSG(ZZ,...)
Definition: UT_Assert.h:159
#define SYS_DEPRECATED_REPLACE(__V__, __R__)
void join(ut_ReduceTaskScopedBody &other)
UT_ParallelInvokeFunctors(const UT_Array< F1 > &functions)
Raw memory buffer with automatic cleanup.
GLdouble n
Definition: glcorearb.h:2008
GLfloat f
Definition: glcorearb.h:1926
void parallel_merge(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, bool destroy, Compare comp)
ut_TaskBody(const Body *body)
ut_TaskScopedInvokeBody(const ut_TaskScopedInvokeBody &src)
~raw_buffer()
Destroy buffer.
void operator()(const UT_BlockedRange< IntType > &range) const
ut_TaskScopedBody(const ut_TaskScopedBody &src)
const Body & body() const
GLuint GLuint end
Definition: glcorearb.h:475
static int getNumProcessors()
#define SYS_FORCE_INLINE
Definition: SYS_Inline.h:45
void UTparallelReduceHeavyItems(const Range &range, Body &body)
OIIO_UTIL_API void parallel_for(int32_t begin, int32_t end, function_view< void(int32_t)> task, paropt opt=0)
UT_BlockedRange2D(RowT row_begin, RowT row_end, ColT col_begin, ColT col_end, size_t row_grainsize=1, size_t col_grainsize=1)
NB: The arguments are in a different order than tbb.
SYS_FORCE_INLINE T operator*()
tbb::split UT_Split
Typedef to denote the "split" constructor of a range.
void operator()(const tbb::blocked_range< int > &r) const
friend void UTparallelFor(const Range &range, const Body &body, const int subscribe_ratio, const int min_grain_size, const bool force_use_task_scope)
UT_BlockedRange(UT_BlockedRange &R, UT_Split split)
void operator()(const Range &r) const
void operator()(const tbb::blocked_range< int > &r) const
ut_TaskScopedBody(const Body *body)
exint entries() const
Alias of size(). size() is preferred.
Definition: UT_Array.h:655
void operator()(const Range &r) const
UT_ParallelInvokePointers(const UT_Array< F1 * > &functions)
UT_BlockedRange()=delete
void UTparallelInvoke(bool parallel, F1 &&f1, F2 &&f2)
void UTparallelStableSort(RandomAccessIterator begin, RandomAccessIterator end, const Compare &compare)
raw_buffer(size_t bytes)
Try to obtain buffer of given size.
void parallel_stable_sort(RandomAccessIterator xs, RandomAccessIterator xe, Compare comp)
auto items() const
ImageBuf OIIO_API max(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
void * get() const
Return pointer to buffer, or NULL if buffer could not be obtained.
parallel_stable_sort_aux_invoke(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp)
UT_BlockedRange2D(UT_BlockedRange2D &R, UT_Split split)
UT_API bool UTperformanceIsRecordingThreadStats()
Determine if we're currently recording thread stats.
void serial_move_merge(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, Compare comp)
Merge sequences [xs,xe) and [ys,ye) to output sequence [zs,(xe-xs)+(ye-ys)), using std::move...
void UTparallelForHeavyItems(const Range &range, const Body &body)
#define UT_ASSERT(ZZ)
Definition: UT_Assert.h:156
GLboolean r
Definition: glcorearb.h:1222
void OIIO_UTIL_API split(string_view str, std::vector< string_view > &result, string_view sep=string_view(), int maxsplit=-1)
ut_ForEachNumberBody(const Body &body, SYS_AtomicInt< IntType > &it, IntType end)
void stable_sort_base_case(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp)
void UTparallelForEachNumberTaskScope(IntType nitems, const Body &body)
UT_CoarsenedRange(UT_CoarsenedRange &range, tbb::split spl)
#define SYSmin(a, b)
Definition: SYS_Math.h:1583
GA_API const UT_StringHolder rest
Declare prior to use.
const ut_TaskScopedInvokeBody< Body > UTmakeTaskScopedInvokeBody(const Body &body)
SYS_FORCE_INLINE ValueWrapper(const T &it)
void UTparallelReduce(const Range &range, Body &body, const int subscribe_ratio=2, const int min_grain_size=1, const bool force_use_task_scope=true)
Definition: format.h:4365
friend void UTparallelReduce(const Range &range, Body &body, const int subscribe_ratio, const int min_grain_size, const bool force_use_taskscope)
void UTserialFor(const Range &range, const Body &body)
bool is_divisible() const
void UTparallelReduceLightItems(const Range &range, Body &body)
parallel_merge_invoke(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, bool destroy, Compare comp)
GLenum src
Definition: glcorearb.h:1793
PcpNodeRef_ChildrenIterator begin(const PcpNodeRef::child_const_range &r)
Support for range-based for loops for PcpNodeRef children ranges.
Definition: node.h:566