HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
dispatcher.h
Go to the documentation of this file.
1 //
2 // Copyright 2016 Pixar
3 //
4 // Licensed under the Apache License, Version 2.0 (the "Apache License")
5 // with the following modification; you may not use this file except in
6 // compliance with the Apache License and the following modification to it:
7 // Section 6. Trademarks. is deleted and replaced with:
8 //
9 // 6. Trademarks. This License does not grant permission to use the trade
10 // names, trademarks, service marks, or product names of the Licensor
11 // and its affiliates, except as required to comply with Section 4(c) of
12 // the License and to reproduce the content of the NOTICE file.
13 //
14 // You may obtain a copy of the Apache License at
15 //
16 // http://www.apache.org/licenses/LICENSE-2.0
17 //
18 // Unless required by applicable law or agreed to in writing, software
19 // distributed under the Apache License with the above modification is
20 // distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
21 // KIND, either express or implied. See the Apache License for the specific
22 // language governing permissions and limitations under the Apache License.
23 //
24 #ifndef PXR_BASE_WORK_DISPATCHER_H
25 #define PXR_BASE_WORK_DISPATCHER_H
26 
27 /// \file work/dispatcher.h
28 
29 #include "pxr/pxr.h"
31 #include "pxr/base/work/api.h"
32 
33 #include "pxr/base/tf/errorMark.h"
35 
36 #include <tbb/concurrent_vector.h>
37 #include <tbb/task.h>
38 
39 #include <functional>
40 #include <type_traits>
41 #include <utility>
42 
44 
45 /// \class WorkDispatcher
46 ///
47 /// A work dispatcher runs concurrent tasks. The dispatcher supports adding
48 /// new tasks from within running tasks. This suits problems that exhibit
49 /// hierarchical structured parallelism: tasks that discover additional tasks
50 /// during their execution.
51 ///
52 /// Typical use is to create a dispatcher and invoke Run() to begin doing
53 /// work, then Wait() for the work to complete. Tasks may invoke Run() during
54 /// their execution as they discover additional tasks to perform.
55 ///
56 /// For example,
57 ///
58 /// \code
59 /// WorkDispatcher dispatcher;
60 /// for (i = 0; i != N; ++i) {
61 /// dispatcher.Run(DoSomeWork, workItem[i]);
62 /// }
63 /// dispatcher.Wait();
64 /// \endcode
65 ///
66 /// Calls to Run() and Cancel() may be made concurrently. However, once Wait()
67 /// is called, calls to Run() and Cancel() must only be made by tasks already
68 /// added by Run(). Additionally, Wait() must never be called by a task added by
69 /// Run(), since that task could never complete.
70 ///
72 {
73 public:
74  /// Construct a new dispatcher.
76 
77  /// Wait() for any pending tasks to complete, then destroy the dispatcher.
79 
80  WorkDispatcher(WorkDispatcher const &) = delete;
81  WorkDispatcher &operator=(WorkDispatcher const &) = delete;
82 
83 #ifdef doxygen
84 
85  /// Add work for the dispatcher to run.
86  ///
87  /// Before a call to Wait() is made it is safe for any client to invoke
88  /// Run(). Once Wait() is invoked, it is \b only safe to invoke Run() from
89  /// within the execution of tasks already added via Run().
90  ///
91  /// This function does not block, in general. It may block if concurrency
92  /// is limited to 1. The added work may be not yet started, may be started
93  /// but not completed, or may be completed upon return. No guarantee is
94  /// made.
95  template <class Callable, class A1, class A2, ... class AN>
96  void Run(Callable &&c, A1 &&a1, A2 &&a2, ... AN &&aN);
97 
98 #else // doxygen
99 
100  template <class Callable>
101  inline void Run(Callable &&c) {
102  _rootTask->spawn(_MakeInvokerTask(std::forward<Callable>(c)));
103  }
104 
105  template <class Callable, class A0, class ... Args>
106  inline void Run(Callable &&c, A0 &&a0, Args&&... args) {
107  Run(std::bind(std::forward<Callable>(c),
108  std::forward<A0>(a0),
109  std::forward<Args>(args)...));
110  }
111 
112 #endif // doxygen
113 
114  /// Block until the work started by Run() completes.
115  WORK_API void Wait();
116 
117  /// Cancel remaining work and return immediately.
118  ///
119  /// Calling this function affects task that are being run directly
120  /// by this dispatcher. If any of these tasks are using their own
121  /// dispatchers to run tasks, these dispatchers will not be affected
122  /// and these tasks will run to completion, unless they are also
123  /// explicitly cancelled.
124  ///
125  /// This call does not block. Call Wait() after Cancel() to wait for
126  /// pending tasks to complete.
127  WORK_API void Cancel();
128 
129 private:
130  typedef tbb::concurrent_vector<TfErrorTransport> _ErrorTransports;
131 
132  // Function invoker helper that wraps the invocation with an ErrorMark so we
133  // can transmit errors that occur back to the thread that Wait() s for tasks
134  // to complete.
135  template <class Fn>
136  struct _InvokerTask : public tbb::task {
137  explicit _InvokerTask(Fn &&fn, _ErrorTransports *err)
138  : _fn(std::move(fn)), _errors(err) {}
139 
140  explicit _InvokerTask(Fn const &fn, _ErrorTransports *err)
141  : _fn(fn), _errors(err) {}
142 
143  virtual tbb::task* execute() {
144  TfErrorMark m;
145  _fn();
146  if (!m.IsClean())
147  WorkDispatcher::_TransportErrors(m, _errors);
148  return NULL;
149  }
150  private:
151  Fn _fn;
152  _ErrorTransports *_errors;
153  };
154 
155  // Make an _InvokerTask instance, letting the function template deduce Fn.
156  template <class Fn>
158  _MakeInvokerTask(Fn &&fn) {
159  return *new( _rootTask->allocate_additional_child_of(*_rootTask) )
160  _InvokerTask<typename std::remove_reference<Fn>::type>(
161  std::forward<Fn>(fn), &_errors);
162  }
163 
164  // Helper function that removes errors from \p m and stores them in a new
165  // entry in \p errors.
166  WORK_API static void
167  _TransportErrors(const TfErrorMark &m, _ErrorTransports *errors);
168 
169  // Task group context and associated root task that allows us to cancel
170  // tasks invoked directly by this dispatcher.
171  tbb::task_group_context _context;
172  tbb::empty_task* _rootTask;
173 
174  // The error transports we use to transmit errors in other threads back to
175  // this thread.
176  _ErrorTransports _errors;
177 };
178 
179 ///////////////////////////////////////////////////////////////////////////////
180 
182 
183 #endif // PXR_BASE_WORK_DISPATCHER_H
type
Definition: core.h:977
const GLfloat * c
Definition: glew.h:16631
void Run(Callable &&c, A0 &&a0, Args &&...args)
Definition: dispatcher.h:106
#define WORK_API
Definition: api.h:40
void Run(Callable &&c)
Definition: dispatcher.h:101
bool IsClean() const
Definition: errorMark.h:99
WorkDispatcher & operator=(WorkDispatcher const &)=delete
PXR_NAMESPACE_CLOSE_SCOPE PXR_NAMESPACE_OPEN_SCOPE
Definition: path.h:1375
WORK_API WorkDispatcher()
Construct a new dispatcher.
#define PXR_NAMESPACE_CLOSE_SCOPE
Definition: pxr.h:91
const GLdouble * m
Definition: glew.h:9166
**If you just want to fire and args
Definition: thread.h:615
WORK_API ~WorkDispatcher()
Wait() for any pending tasks to complete, then destroy the dispatcher.
WORK_API void Wait()
Block until the work started by Run() completes.
WORK_API void Cancel()