HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
dispatcher.h
Go to the documentation of this file.
1 //
2 // Copyright 2016 Pixar
3 //
4 // Licensed under the terms set forth in the LICENSE.txt file available at
5 // https://openusd.org/license.
6 //
7 #ifndef PXR_BASE_WORK_DISPATCHER_H
8 #define PXR_BASE_WORK_DISPATCHER_H
9 
10 /// \file work/dispatcher.h
11 
12 #include "pxr/pxr.h"
14 #include "pxr/base/work/api.h"
15 
16 #include "pxr/base/tf/errorMark.h"
18 
19 // Blocked range is not used in this file, but this header happens to pull in
20 // the TBB version header in a way that works in all TBB versions.
21 #include <tbb/blocked_range.h>
22 #include <tbb/concurrent_vector.h>
23 #if TBB_INTERFACE_VERSION_MAJOR >= 12
24 #include <tbb/task_group.h>
25 #else
26 #include <tbb/task.h>
27 #endif
28 
29 #include <functional>
30 #include <type_traits>
31 #include <utility>
32 
34 
35 /// \class WorkDispatcher
36 ///
37 /// A work dispatcher runs concurrent tasks. The dispatcher supports adding
38 /// new tasks from within running tasks. This suits problems that exhibit
39 /// hierarchical structured parallelism: tasks that discover additional tasks
40 /// during their execution.
41 ///
42 /// Typical use is to create a dispatcher and invoke Run() to begin doing
43 /// work, then Wait() for the work to complete. Tasks may invoke Run() during
44 /// their execution as they discover additional tasks to perform.
45 ///
46 /// For example,
47 ///
48 /// \code
49 /// WorkDispatcher dispatcher;
50 /// for (i = 0; i != N; ++i) {
51 /// dispatcher.Run(DoSomeWork, workItem[i]);
52 /// }
53 /// dispatcher.Wait();
54 /// \endcode
55 ///
56 /// Calls to Run() and Cancel() may be made concurrently. Calls to Wait() may
57 /// also be made concurrently. However, once any calls to Wait() are in-flight,
58 /// calls to Run() and Cancel() must only be made by tasks already added by
59 /// Run(). This means that users of this class are responsible to synchronize
60 /// concurrent calls to Wait() to ensure this requirement is met.
61 ///
62 /// Additionally, Wait() must never be called by a task added by Run(), since
63 /// that task could never complete.
64 ///
66 {
67 public:
68  /// Construct a new dispatcher.
70 
71  /// Wait() for any pending tasks to complete, then destroy the dispatcher.
72  WORK_API ~WorkDispatcher() noexcept;
73 
74  WorkDispatcher(WorkDispatcher const &) = delete;
75  WorkDispatcher &operator=(WorkDispatcher const &) = delete;
76 
77 #ifdef doxygen
78 
79  /// Add work for the dispatcher to run.
80  ///
81  /// Before a call to Wait() is made it is safe for any client to invoke
82  /// Run(). Once Wait() is invoked, it is \b only safe to invoke Run() from
83  /// within the execution of tasks already added via Run().
84  ///
85  /// This function does not block, in general. It may block if concurrency
86  /// is limited to 1. The added work may be not yet started, may be started
87  /// but not completed, or may be completed upon return. No guarantee is
88  /// made.
89  template <class Callable, class A1, class A2, ... class AN>
90  void Run(Callable &&c, A1 &&a1, A2 &&a2, ... AN &&aN);
91 
92 #else // doxygen
93 
94  template <class Callable>
95  inline void Run(Callable &&c) {
96 #if TBB_INTERFACE_VERSION_MAJOR >= 12
97  _taskGroup.run(_InvokerTask<typename std::remove_reference<Callable>::type>(std::forward<Callable>(c), &_errors));
98 #else
99  _rootTask->spawn(_MakeInvokerTask(std::forward<Callable>(c)));
100 #endif
101  }
102 
103  template <class Callable, class A0, class ... Args>
104  inline void Run(Callable &&c, A0 &&a0, Args&&... args) {
105  Run(std::bind(std::forward<Callable>(c),
106  std::forward<A0>(a0),
107  std::forward<Args>(args)...));
108  }
109 
110 #endif // doxygen
111 
112  /// Block until the work started by Run() completes.
113  WORK_API void Wait();
114 
115  /// Cancel remaining work and return immediately.
116  ///
117  /// Calling this function affects task that are being run directly
118  /// by this dispatcher. If any of these tasks are using their own
119  /// dispatchers to run tasks, these dispatchers will not be affected
120  /// and these tasks will run to completion, unless they are also
121  /// explicitly cancelled.
122  ///
123  /// This call does not block. Call Wait() after Cancel() to wait for
124  /// pending tasks to complete.
125  WORK_API void Cancel();
126 
127  /// Returns true if Cancel() has been called. Calling Wait() will reset the
128  /// cancel state.
129  WORK_API bool IsCancelled() const;
130 
131 private:
132  typedef tbb::concurrent_vector<TfErrorTransport> _ErrorTransports;
133 
134  // Function invoker helper that wraps the invocation with an ErrorMark so we
135  // can transmit errors that occur back to the thread that Wait() s for tasks
136  // to complete.
137 #if TBB_INTERFACE_VERSION_MAJOR >= 12
138  template <class Fn>
139  struct _InvokerTask {
140  explicit _InvokerTask(Fn &&fn, _ErrorTransports *err)
141  : _fn(std::move(fn)), _errors(err) {}
142 
143  explicit _InvokerTask(Fn const &fn, _ErrorTransports *err)
144  : _fn(fn), _errors(err) {}
145 
146  // Ensure only moves happen, no copies.
147  _InvokerTask(_InvokerTask &&other) = default;
148  _InvokerTask(const _InvokerTask &other) = delete;
149  _InvokerTask &operator=(const _InvokerTask &other) = delete;
150 
151  void operator()() const {
152  TfErrorMark m;
153  _fn();
154  if (!m.IsClean())
155  WorkDispatcher::_TransportErrors(m, _errors);
156  }
157  private:
158  Fn _fn;
159  _ErrorTransports *_errors;
160  };
161 #else
162  template <class Fn>
163  struct _InvokerTask : public tbb::task {
164  explicit _InvokerTask(Fn &&fn, _ErrorTransports *err)
165  : _fn(std::move(fn)), _errors(err) {}
166 
167  explicit _InvokerTask(Fn const &fn, _ErrorTransports *err)
168  : _fn(fn), _errors(err) {}
169 
170  virtual tbb::task* execute() {
171  TfErrorMark m;
172  // In anticipation of OneTBB, ensure that _fn meets OneTBB's
173  // requirement that a task's call operator must be const.
174  const_cast<_InvokerTask const *>(this)->_fn();
175  if (!m.IsClean())
176  WorkDispatcher::_TransportErrors(m, _errors);
177  return NULL;
178  }
179  private:
180  Fn _fn;
181  _ErrorTransports *_errors;
182  };
183 
184  // Make an _InvokerTask instance, letting the function template deduce Fn.
185  template <class Fn>
187  _MakeInvokerTask(Fn &&fn) {
188  return *new( _rootTask->allocate_additional_child_of(*_rootTask) )
189  _InvokerTask<typename std::remove_reference<Fn>::type>(
190  std::forward<Fn>(fn), &_errors);
191  }
192 #endif
193 
194  // Helper function that removes errors from \p m and stores them in a new
195  // entry in \p errors.
196  WORK_API static void
197  _TransportErrors(const TfErrorMark &m, _ErrorTransports *errors);
198 
199  // Task group context to run tasks in.
200  tbb::task_group_context _context;
201 #if TBB_INTERFACE_VERSION_MAJOR >= 12
202  // Custom task group that lets us implement thread safe concurrent wait.
203  class _TaskGroup : public tbb::task_group {
204  public:
205  _TaskGroup(tbb::task_group_context& ctx) : tbb::task_group(ctx) {}
206  inline tbb::detail::d1::wait_context& _GetInternalWaitContext();
207  };
208 
209  _TaskGroup _taskGroup;
210 #else
211  // Root task that allows us to cancel tasks invoked directly by this
212  // dispatcher.
213  tbb::empty_task* _rootTask;
214 #endif
215  std::atomic<bool> _isCancelled;
216 
217  // The error transports we use to transmit errors in other threads back to
218  // this thread.
219  _ErrorTransports _errors;
220 
221  // Concurrent calls to Wait() have to serialize certain cleanup operations.
222  std::atomic_flag _waitCleanupFlag;
223 };
224 
225 // Wrapper class for non-const tasks.
226 template <class Fn>
228  explicit Work_DeprecatedMutableTask(Fn &&fn)
229  : _fn(std::move(fn)) {}
230 
231  explicit Work_DeprecatedMutableTask(Fn const &fn)
232  : _fn(fn) {}
233 
234  // Ensure only moves happen, no copies.
236  (Work_DeprecatedMutableTask &&other) = default;
238  (const Work_DeprecatedMutableTask &other) = delete;
240  &operator= (const Work_DeprecatedMutableTask &other) = delete;
241 
242  void operator()() const {
243  _fn();
244  }
245 private:
246  mutable Fn _fn;
247 };
248 
249 // Wrapper function to convert non-const tasks to a Work_DeprecatedMutableTask.
250 // When adding new tasks refrain from using this wrapper, instead ensure the
251 // call operator of the task is const such that it is compatible with oneTBB.
252 template <typename Fn>
256  (std::forward<Fn>(fn));
257 }
258 
259 ///////////////////////////////////////////////////////////////////////////////
260 
262 
263 #endif // PXR_BASE_WORK_DISPATCHER_H
type
Definition: core.h:556
Work_DeprecatedMutableTask(Fn const &fn)
Definition: dispatcher.h:231
void Run(Callable &&c, A0 &&a0, Args &&...args)
Definition: dispatcher.h:104
#define WORK_API
Definition: api.h:23
void operator()() const
Definition: dispatcher.h:242
void Run(Callable &&c)
Definition: dispatcher.h:95
bool IsClean() const
Definition: errorMark.h:82
WorkDispatcher & operator=(WorkDispatcher const &)=delete
WORK_API bool IsCancelled() const
WORK_API ~WorkDispatcher() noexcept
Wait() for any pending tasks to complete, then destroy the dispatcher.
PXR_NAMESPACE_CLOSE_SCOPE PXR_NAMESPACE_OPEN_SCOPE
Definition: path.h:1425
Work_DeprecatedMutableTask< typename std::remove_reference_t< Fn > > WorkMakeDeprecatedMutableTask(Fn &&fn)
Definition: dispatcher.h:254
WORK_API WorkDispatcher()
Construct a new dispatcher.
Work_DeprecatedMutableTask(Fn &&fn)
Definition: dispatcher.h:228
#define PXR_NAMESPACE_CLOSE_SCOPE
Definition: pxr.h:74
**If you just want to fire and args
Definition: thread.h:618
WORK_API void Wait()
Block until the work started by Run() completes.
Work_DeprecatedMutableTask & operator=(const Work_DeprecatedMutableTask &other)=delete
WORK_API void Cancel()