HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
dispatcher.h
Go to the documentation of this file.
1 //
2 // Copyright 2016 Pixar
3 //
4 // Licensed under the Apache License, Version 2.0 (the "Apache License")
5 // with the following modification; you may not use this file except in
6 // compliance with the Apache License and the following modification to it:
7 // Section 6. Trademarks. is deleted and replaced with:
8 //
9 // 6. Trademarks. This License does not grant permission to use the trade
10 // names, trademarks, service marks, or product names of the Licensor
11 // and its affiliates, except as required to comply with Section 4(c) of
12 // the License and to reproduce the content of the NOTICE file.
13 //
14 // You may obtain a copy of the Apache License at
15 //
16 // http://www.apache.org/licenses/LICENSE-2.0
17 //
18 // Unless required by applicable law or agreed to in writing, software
19 // distributed under the Apache License with the above modification is
20 // distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
21 // KIND, either express or implied. See the Apache License for the specific
22 // language governing permissions and limitations under the Apache License.
23 //
24 #ifndef PXR_BASE_WORK_DISPATCHER_H
25 #define PXR_BASE_WORK_DISPATCHER_H
26 
27 /// \file work/dispatcher.h
28 
29 #include "pxr/pxr.h"
31 #include "pxr/base/work/api.h"
32 
33 #include "pxr/base/tf/errorMark.h"
35 
36 #include <tbb/concurrent_vector.h>
37 #include <tbb/task.h>
38 
39 #include <functional>
40 #include <type_traits>
41 #include <utility>
42 
44 
45 /// \class WorkDispatcher
46 ///
47 /// A work dispatcher runs concurrent tasks. The dispatcher supports adding
48 /// new tasks from within running tasks. This suits problems that exhibit
49 /// hierarchical structured parallelism: tasks that discover additional tasks
50 /// during their execution.
51 ///
52 /// Typical use is to create a dispatcher and invoke Run() to begin doing
53 /// work, then Wait() for the work to complete. Tasks may invoke Run() during
54 /// their execution as they discover additional tasks to perform.
55 ///
56 /// For example,
57 ///
58 /// \code
59 /// WorkDispatcher dispatcher;
60 /// for (i = 0; i != N; ++i) {
61 /// dispatcher.Run(DoSomeWork, workItem[i]);
62 /// }
63 /// dispatcher.Wait();
64 /// \endcode
65 ///
66 /// Calls to Run() and Cancel() may be made concurrently. Calls to Wait() may
67 /// also be made concurrently. However, once any calls to Wait() are in-flight,
68 /// calls to Run() and Cancel() must only be made by tasks already added by
69 /// Run(). This means that users of this class are responsible to synchronize
70 /// concurrent calls to Wait() to ensure this requirement is met.
71 ///
72 /// Additionally, Wait() must never be called by a task added by Run(), since
73 /// that task could never complete.
74 ///
76 {
77 public:
78  /// Construct a new dispatcher.
80 
81  /// Wait() for any pending tasks to complete, then destroy the dispatcher.
83 
84  WorkDispatcher(WorkDispatcher const &) = delete;
85  WorkDispatcher &operator=(WorkDispatcher const &) = delete;
86 
87 #ifdef doxygen
88 
89  /// Add work for the dispatcher to run.
90  ///
91  /// Before a call to Wait() is made it is safe for any client to invoke
92  /// Run(). Once Wait() is invoked, it is \b only safe to invoke Run() from
93  /// within the execution of tasks already added via Run().
94  ///
95  /// This function does not block, in general. It may block if concurrency
96  /// is limited to 1. The added work may be not yet started, may be started
97  /// but not completed, or may be completed upon return. No guarantee is
98  /// made.
99  template <class Callable, class A1, class A2, ... class AN>
100  void Run(Callable &&c, A1 &&a1, A2 &&a2, ... AN &&aN);
101 
102 #else // doxygen
103 
104  template <class Callable>
105  inline void Run(Callable &&c) {
106  _rootTask->spawn(_MakeInvokerTask(std::forward<Callable>(c)));
107  }
108 
109  template <class Callable, class A0, class ... Args>
110  inline void Run(Callable &&c, A0 &&a0, Args&&... args) {
111  Run(std::bind(std::forward<Callable>(c),
112  std::forward<A0>(a0),
113  std::forward<Args>(args)...));
114  }
115 
116 #endif // doxygen
117 
118  /// Block until the work started by Run() completes.
119  WORK_API void Wait();
120 
121  /// Cancel remaining work and return immediately.
122  ///
123  /// Calling this function affects task that are being run directly
124  /// by this dispatcher. If any of these tasks are using their own
125  /// dispatchers to run tasks, these dispatchers will not be affected
126  /// and these tasks will run to completion, unless they are also
127  /// explicitly cancelled.
128  ///
129  /// This call does not block. Call Wait() after Cancel() to wait for
130  /// pending tasks to complete.
131  WORK_API void Cancel();
132 
133 private:
134  typedef tbb::concurrent_vector<TfErrorTransport> _ErrorTransports;
135 
136  // Function invoker helper that wraps the invocation with an ErrorMark so we
137  // can transmit errors that occur back to the thread that Wait() s for tasks
138  // to complete.
139  template <class Fn>
140  struct _InvokerTask : public tbb::task {
141  explicit _InvokerTask(Fn &&fn, _ErrorTransports *err)
142  : _fn(std::move(fn)), _errors(err) {}
143 
144  explicit _InvokerTask(Fn const &fn, _ErrorTransports *err)
145  : _fn(fn), _errors(err) {}
146 
147  virtual tbb::task* execute() {
148  TfErrorMark m;
149  _fn();
150  if (!m.IsClean())
151  WorkDispatcher::_TransportErrors(m, _errors);
152  return NULL;
153  }
154  private:
155  Fn _fn;
156  _ErrorTransports *_errors;
157  };
158 
159  // Make an _InvokerTask instance, letting the function template deduce Fn.
160  template <class Fn>
162  _MakeInvokerTask(Fn &&fn) {
163  return *new( _rootTask->allocate_additional_child_of(*_rootTask) )
164  _InvokerTask<typename std::remove_reference<Fn>::type>(
165  std::forward<Fn>(fn), &_errors);
166  }
167 
168  // Helper function that removes errors from \p m and stores them in a new
169  // entry in \p errors.
170  WORK_API static void
171  _TransportErrors(const TfErrorMark &m, _ErrorTransports *errors);
172 
173  // Task group context and associated root task that allows us to cancel
174  // tasks invoked directly by this dispatcher.
175  tbb::task_group_context _context;
176  tbb::empty_task* _rootTask;
177 
178  // The error transports we use to transmit errors in other threads back to
179  // this thread.
180  _ErrorTransports _errors;
181 
182  // Concurrent calls to Wait() have to serialize certain cleanup operations.
183  std::atomic_flag _waitCleanupFlag;
184 };
185 
186 ///////////////////////////////////////////////////////////////////////////////
187 
189 
190 #endif // PXR_BASE_WORK_DISPATCHER_H
void Run(Callable &&c, A0 &&a0, Args &&...args)
Definition: dispatcher.h:110
#define WORK_API
Definition: api.h:40
void Run(Callable &&c)
Definition: dispatcher.h:105
bool IsClean() const
Definition: errorMark.h:99
WorkDispatcher & operator=(WorkDispatcher const &)=delete
PXR_NAMESPACE_CLOSE_SCOPE PXR_NAMESPACE_OPEN_SCOPE
Definition: path.h:1432
WORK_API WorkDispatcher()
Construct a new dispatcher.
#define PXR_NAMESPACE_CLOSE_SCOPE
Definition: pxr.h:91
**If you just want to fire and args
Definition: thread.h:609
WORK_API ~WorkDispatcher()
Wait() for any pending tasks to complete, then destroy the dispatcher.
WORK_API void Wait()
Block until the work started by Run() completes.
type
Definition: core.h:1059
WORK_API void Cancel()