32 #ifndef __UT_ParallelUtil__
33 #define __UT_ParallelUtil__
45 #include <oneapi/tbb/blocked_range.h>
46 #include <oneapi/tbb/blocked_range2d.h>
47 #include <oneapi/tbb/parallel_for.h>
48 #include <oneapi/tbb/parallel_reduce.h>
49 #include <oneapi/tbb/parallel_sort.h>
50 #include <oneapi/tbb/task.h>
51 #include <oneapi/tbb/task_arena.h>
60 template <
typename RowT,
typename ColT=RowT>
64 template<
typename RANGE >
76 template<
typename T >
83 return range.rows().size() * range.cols().size();
88 template <
typename RANGE>
100 template<
typename RANGE >
111 myGrainSize(range.myGrainSize)
120 RANGE::is_divisible() &&
129 myGrainSize(grain_size)
133 template <
typename Range,
typename Body>
135 const Range &
range,
const Body &body,
136 const int subscribe_ratio,
const int min_grain_size,
137 const bool force_use_task_scope
139 template <
typename Range,
typename Body>
141 const Range &
range, Body &body,
142 const int subscribe_ratio,
const int min_grain_size,
143 const bool force_use_taskscope
145 template <
typename Range,
typename Body>
147 const Range &
range, Body &body,
const int grain_size,
148 const bool force_use_taskscope
156 template<
typename Range,
typename Body>
167 : myBody(src.myBody),
168 myParentTaskScope(src.myParentTaskScope)
186 template<
typename Range,
typename Body>
202 template <
typename IntType,
typename Body>
293 template <
typename Range,
typename Body>
295 const Range &
range,
const Body &body,
296 const int subscribe_ratio = 2,
297 const int min_grain_size = 1,
298 const bool force_use_task_scope =
true
310 if (est_range_size == 0)
314 if (num_processors == 1 || est_range_size <= min_grain_size ||
321 size_t grain_size(min_grain_size);
322 if( subscribe_ratio > 0 )
325 est_range_size / (subscribe_ratio * num_processors)
334 tbb::simple_partitioner());
340 tbb::simple_partitioner());
346 template <
typename Range,
typename Body>
348 const Range &
range,
const Body &body,
349 const int subscribe_ratio = 2,
350 const int min_grain_size = 1
353 UTparallelFor(range, body, subscribe_ratio, min_grain_size,
true);
359 template <
typename Range,
typename Body>
362 const bool force_use_task_scope =
true)
379 template <
typename Range,
typename Body>
403 template <
typename IntType,
typename Body>
412 if (num_processors == 1)
417 if (nitems <= num_processors)
434 template <
typename IntType,
typename Body>
438 for (IntType i = 0; i < nitems; ++i)
445 template <
typename IntType,
typename Body>
454 template <
typename Range,
typename Body>
462 template<
typename Body>
473 : myBody(src.myBody),
474 myParentTaskScope(src.myParentTaskScope)
493 template <
typename Body>
503 template <
typename F1,
typename F2>
518 template <
typename F1,
typename F2,
typename... Rest>
534 template <
typename F1>
539 : myFunctions(functions) {}
542 for (
int i = r.begin(); i != r.end(); ++i)
552 template <
typename F1>
563 for (
int i = 0; i < funs.
entries(); i++)
568 template <
typename F1>
573 : myFunctions(functions) {}
576 for (
int i = r.begin(); i != r.end(); ++i)
586 template <
typename F1>
597 for (
int i = 0; i < funs.
entries(); i++)
606 template<
typename Range,
typename Body>
618 : myParentTaskScope(src.myParentTaskScope)
637 const Body &
body()
const {
return myBodyPtr ? *myBodyPtr : *myBody; }
638 Body &
body() {
return myBodyPtr ? *myBodyPtr : *myBody; }
718 template <
typename Range,
typename Body>
722 const int subscribe_ratio = 2,
723 const int min_grain_size = 1,
724 const bool force_use_task_scope =
true
736 if (est_range_size == 0)
740 if (num_processors == 1 || est_range_size <= min_grain_size ||
747 size_t grain_size(min_grain_size);
748 if( subscribe_ratio > 0 )
751 est_range_size / (subscribe_ratio * num_processors)
758 tbb::parallel_reduce(coarsened_range,
760 tbb::simple_partitioner());
764 tbb::parallel_reduce(coarsened_range, body, tbb::simple_partitioner());
778 template <
typename Range,
typename Body>
782 const int grain_size,
783 const bool force_use_task_scope =
true
791 if (est_range_size == 0)
795 "FIXME: There needs to be a way to do identical splits and joins when single-threading,"
796 " to avoid having different roundoff error from when multi-threading. "
797 " Something using simple_partitioner() might work.");
803 tbb::parallel_deterministic_reduce(coarsened_range,
805 tbb::simple_partitioner());
809 tbb::parallel_deterministic_reduce(coarsened_range, body);
816 template <
typename Range,
typename Body>
825 template <
typename Range,
typename Body>
833 template <
typename Range,
typename Body>
839 UTparallelCancelGroupExecution()
841 tbb::task::current_context()->cancel_group_execution();
848 template <
typename RandomAccessIterator,
typename Compare>
852 tbb::parallel_sort(begin, end, compare);
861 template <
typename RandomAccessIterator>
865 tbb::parallel_sort(begin, end);
874 template <
typename T>
878 tbb::parallel_sort(begin, end);
886 template<
typename RandomAccessIterator,
typename Compare>
891 template<
class RandomAccessIterator>
905 template <
typename RandomAccessIterator,
typename Compare>
918 template <
typename RandomAccessIterator>
930 template <
typename T>
942 template <
typename T,
typename Compare>
956 template <
typename T>
971 template <
typename T,
typename Compare>
990 template <
typename T>
998 : tbb::blocked_range<
T>(begin_value, end_value, grainsize)
1001 : tbb::blocked_range<
T>(R, split)
1022 {
return (myCurrent == cmp.myCurrent); }
1025 {
return !(*
this ==
cmp); }
1058 template <
typename RowT,
typename ColT>
1067 ColT col_begin, ColT col_end,
1068 size_t row_grainsize=1,
size_t col_grainsize=1)
1069 : tbb::blocked_range2d<RowT, ColT>(row_begin, row_end, row_grainsize,
1070 col_begin, col_end, col_grainsize)
1073 : tbb::blocked_range2d<RowT, ColT>(R, split)
1083 template <
typename Op,
typename T>
1089 const int grain_size = 1024,
1090 const bool force_use_task_scope =
true
1095 if (array.
entries() < grain_size * 10)
1100 total =
op(total, array(i));
1110 exint nblocks = (array.
entries() + grain_size-1) / grain_size;
1117 for (
exint block = r.begin(); block < r.end(); block++)
1122 for (
exint i = start; i <
end; i++)
1124 total =
op(total, array(i));
1128 blocktotals(block) = total;
1130 }, force_use_task_scope);
1135 grain_size, force_use_task_scope);
1140 for (
exint block = r.begin(); block < r.end(); block++)
1146 T total = blocktotals(block-1);
1147 for (
exint i = start; i <
end; i++)
1149 array(i) =
op(total, array(i));
1153 }, force_use_task_scope);
1160 #if TBB_VERSION_MAJOR >= 2018
1161 template <
typename F>
static inline void
1162 UTisolate(F &
f) { tbb::this_task_arena::isolate(f); }
1164 template <
typename F>
static inline void
1165 UTisolate(
const F &
f) { tbb::this_task_arena::isolate(f); }
1167 template <
typename F>
static inline void
1170 tbb::task_arena __nested;
1171 __nested.execute(f);
1173 template <
typename F>
static inline void
1174 UTisolate(
const F &f)
1176 tbb::task_arena __nested;
1177 __nested.execute(f);
1218 #include <algorithm>
1222 namespace internal {
1225 template<
class RandomAccessIterator>
1235 template<
class RandomAccessIterator1,
class RandomAccessIterator2,
class RandomAccessIterator3,
class Compare>
1236 void serial_move_merge( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, Compare comp ) {
1242 if( comp(*ys,*xs) ) {
1243 *zs = std::move(*ys);
1245 if( ++ys==ye )
break;
1247 *zs = std::move(*xs);
1249 if( ++xs==xe )
goto movey;
1257 std::move( ys, ye, zs );
1260 template<
typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename Compare>
1261 void stable_sort_base_case( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs,
int inplace, Compare comp) {
1262 std::stable_sort( xs, xe, comp );
1264 RandomAccessIterator2 ze = zs + (xe-xs);
1268 for( ; zs<ze; ++zs )
1272 for( ; zs<ze; ++xs, ++zs )
1273 new(&*zs) T(std::move(*xs));
1285 operator bool()
const {
return ptr;}
1287 void*
get()
const {
return ptr;}
1292 template<
typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename RandomAccessIterator3,
typename Compare>
1293 void parallel_merge( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys,
1294 RandomAccessIterator2 ye, RandomAccessIterator3 zs,
bool destroy, Compare comp );
1296 template<
typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename RandomAccessIterator3,
typename Compare>
1304 parallel_merge_invoke( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye,
1305 RandomAccessIterator3 zs,
bool destroy, Compare comp):
1313 template<
typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename RandomAccessIterator3,
typename Compare>
1314 void parallel_merge( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys,
1315 RandomAccessIterator2 ye, RandomAccessIterator3 zs,
bool destroy, Compare comp ) {
1316 const size_t MERGE_CUT_OFF = 2000;
1317 if( (xe-xs) + (ye-ys) <= MERGE_CUT_OFF ) {
1324 RandomAccessIterator1 xm;
1325 RandomAccessIterator2 ym;
1326 if( xe-xs < ye-ys ) {
1328 xm = std::upper_bound(xs,xe,*ym,comp);
1331 ym = std::lower_bound(ys,ye,*xm,comp);
1333 RandomAccessIterator3 zm = zs + ((xm-xs) + (ym-ys));
1334 tbb::parallel_invoke(
parallel_merge_invoke<RandomAccessIterator1, RandomAccessIterator2, RandomAccessIterator3, Compare>( xs, xm, ys, ym, zs, destroy, comp ),
1339 template<
typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename Compare>
1340 void parallel_stable_sort_aux( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs,
int inplace, Compare comp );
1342 template<
typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename Compare>
1358 template<
typename RandomAccessIterator1,
typename RandomAccessIterator2,
typename Compare>
1359 void parallel_stable_sort_aux( RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs,
int inplace, Compare comp ) {
1360 const size_t SORT_CUT_OFF = 500;
1361 if( xe-xs<=SORT_CUT_OFF ) {
1364 RandomAccessIterator1 xm = xs + (xe-xs)/2;
1365 RandomAccessIterator2 zm = zs + (xm-xs);
1366 RandomAccessIterator2 ze = zs + (xe-xs);
1377 template<
typename RandomAccessIterator,
typename Compare>
1385 std::stable_sort( xs, xe, comp );
ut_TaskScopedInvokeBody(const Body &body)
void UTparallelSort(RandomAccessIterator begin, RandomAccessIterator end, const Compare &compare)
UT_BlockedRange2D()=delete
SYS_FORCE_INLINE bool operator==(const ValueWrapper &cmp) const
UT_BlockedRange(T begin_value, T end_value, size_t grainsize=1)
SYS_FORCE_INLINE ValueWrapper & operator++()
void UTparallelFor(const Range &range, const Body &body, const int subscribe_ratio=2, const int min_grain_size=1, const bool force_use_task_scope=true)
void UTparallelDeterministicReduce(const Range &range, Body &body, const int grain_size, const bool force_use_task_scope=true)
size_t operator()(const RANGE &range) const
void UTparallelForTaskScope(const Range &range, const Body &body, const int subscribe_ratio=2, const int min_grain_size=1)
SYS_FORCE_INLINE bool operator!=(const ValueWrapper &cmp) const
friend void UTparallelDeterministicReduce(const Range &range, Body &body, const int grain_size, const bool force_use_taskscope)
void operator()(const Range &r)
void UTparallelForEachNumber(IntType nitems, const Body &body, const bool force_use_task_scope=true)
void UTparallelDeterministicPrefixSumInPlace(UT_Array< T > &array, const T identity, const Op &op, const int grain_size=1024, const bool force_use_task_scope=true)
void setSizeNoInit(exint newsize)
void UTserialReduce(const Range &range, Body &body)
ut_ReduceTaskScopedBody(Body *body)
CompareResults OIIO_API compare(const ImageBuf &A, const ImageBuf &B, float failthresh, float warnthresh, float failrelative, float warnrelative, ROI roi={}, int nthreads=0)
RandomAccessIterator1 _xe
GLdouble GLdouble GLdouble z
RandomAccessIterator1 _xe
GLboolean GLboolean GLboolean GLboolean a
void serial_destroy(RandomAccessIterator zs, RandomAccessIterator ze)
Destroy sequence [xs,xe)
PUGI__FN void sort(I begin, I end, const Pred &pred)
void UTparallelForLightItems(const Range &range, const Body &body, const bool force_use_task_scope=true)
void UTserialForEachNumber(IntType nitems, const Body &body, bool usetaskscope=true)
RandomAccessIterator2 _ye
void parallel_stable_sort_aux(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp)
static bool isThreadingEnabled()
ut_ReduceTaskScopedBody(ut_ReduceTaskScopedBody &src, UT_Split)
std::optional< T > UT_Optional
size_t UTestimatedNumItems(const RANGE &range)
This is needed by UT_CoarsenedRange.
IMATH_HOSTDEVICE constexpr int cmp(T a, T b) IMATH_NOEXCEPT
size_t operator()(const UT_BlockedRange2D< T > &range) const
#define UT_ASSERT_MSG(ZZ,...)
#define SYS_DEPRECATED_REPLACE(__V__, __R__)
void join(ut_ReduceTaskScopedBody &other)
RandomAccessIterator2 _ys
UT_ParallelInvokeFunctors(const UT_Array< F1 > &functions)
RandomAccessIterator2 _zs
Raw memory buffer with automatic cleanup.
void parallel_merge(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, bool destroy, Compare comp)
ut_TaskBody(const Body *body)
ut_TaskScopedInvokeBody(const ut_TaskScopedInvokeBody &src)
~raw_buffer()
Destroy buffer.
void operator()(const UT_BlockedRange< IntType > &range) const
ut_TaskScopedBody(const ut_TaskScopedBody &src)
const Body & body() const
static int getNumProcessors()
void UTparallelReduceHeavyItems(const Range &range, Body &body)
OIIO_UTIL_API void parallel_for(int32_t begin, int32_t end, function_view< void(int32_t)> task, paropt opt=0)
UT_BlockedRange2D(RowT row_begin, RowT row_end, ColT col_begin, ColT col_end, size_t row_grainsize=1, size_t col_grainsize=1)
NB: The arguments are in a different order than tbb.
SYS_FORCE_INLINE T operator*()
RandomAccessIterator1 _xs
tbb::split UT_Split
Typedef to denote the "split" constructor of a range.
void operator()(const tbb::blocked_range< int > &r) const
friend void UTparallelFor(const Range &range, const Body &body, const int subscribe_ratio, const int min_grain_size, const bool force_use_task_scope)
UT_BlockedRange(UT_BlockedRange &R, UT_Split split)
void operator()(const Range &r) const
void operator()(const tbb::blocked_range< int > &r) const
ut_TaskScopedBody(const Body *body)
exint entries() const
Alias of size(). size() is preferred.
void operator()(const Range &r) const
UT_ParallelInvokePointers(const UT_Array< F1 * > &functions)
void UTparallelInvoke(bool parallel, F1 &&f1, F2 &&f2)
void UTparallelStableSort(RandomAccessIterator begin, RandomAccessIterator end, const Compare &compare)
RandomAccessIterator3 _zs
raw_buffer(size_t bytes)
Try to obtain buffer of given size.
void parallel_stable_sort(RandomAccessIterator xs, RandomAccessIterator xe, Compare comp)
ImageBuf OIIO_API max(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
void * get() const
Return pointer to buffer, or NULL if buffer could not be obtained.
parallel_stable_sort_aux_invoke(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp)
UT_BlockedRange2D(UT_BlockedRange2D &R, UT_Split split)
void serial_move_merge(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, Compare comp)
Merge sequences [xs,xe) and [ys,ye) to output sequence [zs,(xe-xs)+(ye-ys)), using std::move...
void UTparallelForHeavyItems(const Range &range, const Body &body)
RandomAccessIterator1 _xs
void OIIO_UTIL_API split(string_view str, std::vector< string_view > &result, string_view sep=string_view(), int maxsplit=-1)
ut_ForEachNumberBody(const Body &body, SYS_AtomicInt< IntType > &it, IntType end)
void stable_sort_base_case(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 zs, int inplace, Compare comp)
void UTparallelForEachNumberTaskScope(IntType nitems, const Body &body)
UT_CoarsenedRange(UT_CoarsenedRange &range, tbb::split spl)
GA_API const UT_StringHolder rest
const ut_TaskScopedInvokeBody< Body > UTmakeTaskScopedInvokeBody(const Body &body)
SYS_FORCE_INLINE ValueWrapper(const T &it)
void UTparallelReduce(const Range &range, Body &body, const int subscribe_ratio=2, const int min_grain_size=1, const bool force_use_task_scope=true)
friend void UTparallelReduce(const Range &range, Body &body, const int subscribe_ratio, const int min_grain_size, const bool force_use_taskscope)
void UTserialFor(const Range &range, const Body &body)
bool is_divisible() const
void UTparallelReduceLightItems(const Range &range, Body &body)
parallel_merge_invoke(RandomAccessIterator1 xs, RandomAccessIterator1 xe, RandomAccessIterator2 ys, RandomAccessIterator2 ye, RandomAccessIterator3 zs, bool destroy, Compare comp)
PcpNodeRef_ChildrenIterator begin(const PcpNodeRef::child_const_range &r)
Support for range-based for loops for PcpNodeRef children ranges.