HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
threadpool.h
Go to the documentation of this file.
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7  http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 /* Modifications Copyright (c) Microsoft. */
17 
18 #pragma once
19 #include <string>
20 #include <vector>
21 #include <functional>
22 #include <memory>
23 #include "core/common/common.h"
24 #include "core/platform/env.h"
25 
26 #include <functional>
27 #include <memory>
28 
29 // ORT thread pool overview
30 // ------------------------
31 //
32 // The ORT thread pool implementation is split into two layers. This
33 // file provides the high-level component. See the accompanying
34 // comments in EigenNonBlockingThreadPool.h for the low-level
35 // component.
36 //
37 // threadpool.h defines the user-facing functions for use in
38 // operators. The main abstraction are parallel loops
39 // (ThreadPool::TryParallelFor*), although we also support scheduling
40 // of asynchronous tasks (ThreadPool::Schedule), and the construction
41 // of multi-loop parallel sections (ThreadPool::ParallelSection).
42 //
43 // This high level API is accessed via static methods on the
44 // ThreadPool class. These methods map the operations onto one of
45 // three low-level implementations: (#1) direct execution of the
46 // operations if there is no thread pool configured, (#2) execution of
47 // the operations using the modified Eigen threadpool, (#3) execution
48 // of the operations using OpenMP. Option #1 enables execution in
49 // simple settings without needing threads. Option #2 is the
50 // preferred approach for use in settings with parallelism.
51 //
52 // The high-level part of the thread pool is responsible for:
53 //
54 // - Exposing the desired degree of parallelism to user code, and to
55 // libraries such as MLAS. This lets the libraries tailor the
56 // extent to which they parallelize work.
57 //
58 // - Handling trivial cases (such as directly running parallel loops
59 // with only a single iteration, or with no iterations at all).
60 //
61 // - Deciding how to divide work efficiently between the threads
62 // available.
63 //
64 // The ThreadPool::TryParallelFor methods do this based on cost
65 // estimates supplied by the caller, and are designed to support
66 // loops with small amounts of work per iteration. The loop body is
67 // supplied as a function taking a [start,end) range of iterations
68 // to execute (avoiding the need for per-iteration std::function
69 // calls, or a reliance upon inlining to avoid those calls).
70 //
71 // ThreadPool::TrySimpleParallelFor uses a simpler single-iteration
72 // API based on the assumption that the caller has divided work to
73 // an appropriate granularity.
74 //
75 // - When used with the Eigen-based thread pool, the implementation of
76 // all of the loops maps down onto
77 // ThreadPool::ParallelForFixedBlockSizeScheduling. This method
78 // takes the degree of parallelism (d_of_p) and work distribution
79 // block size (from the cost-based heuristics), and creates a set of
80 // tasks in the underlying thread pool (via
81 // ThreadPool::RunInParallel).
82 //
83 // These tasks then run a loop which picks off batches of iterations
84 // from the user's code. The distribution of these batches is
85 // handled dynmamically via LoopCounter::ClaimIterations. This
86 // dynamic balancing behavior helps make performance robust to any
87 // variability in the execution time across iterations, and to
88 // situations such as multiple loops running concurrently on the
89 // same thread pool.
90 //
91 // - When running a series of loops inside a parallel section, the
92 // LoopCounter also helps obtain affinity between these loops (i.e.,
93 // iteration X of one loop will tend to run on the same thread that
94 // ran iteration X of prior loops). This locality helps improve hit
95 // rates in per-core caches across the series of short loops used in
96 // operators like GRU.
97 //
98 // There are some known areas for exploration here:
99 //
100 // - The cost-based heuristics were developed prior to recent changes
101 // to the thread pool. The heuristics seem to work well, but we
102 // should revisit the tuning periodically.
103 //
104 // - Can we unify the APIs for the different kinds of parallel loop?
105 //
106 // In particular, we may be able to replace the current use of
107 // TryBatchParallelFor with appropriate costs for each call site,
108 // and then use TryParallelFor. This would allow for more dynamic
109 // re-balancing of work between threads than the current
110 // ThreadPool::PartitionWork function provides.
111 //
112 // - Given the extensive modifications to original Eigen code, should
113 // we separate that out as a new class and remove the dependence on
114 // other Eigen components.
115 
116 // This file use PIMPL to avoid having eigen headers here
117 namespace Eigen {
118 class Allocator;
119 class ThreadPoolInterface;
120 } // namespace Eigen
121 
122 namespace onnxruntime {
123 
124 struct TensorOpCost {
125  double bytes_loaded;
126  double bytes_stored;
128 };
129 
130 
131 namespace concurrency {
132 
133 template <typename Environment>
134 class ThreadPoolTempl;
135 
136 class ExtendedThreadPoolInterface;
137 class LoopCounter;
138 class ThreadPoolParallelSection;
139 
140 class ThreadPool {
141  public:
142 #ifdef _WIN32
143  using NAME_CHAR_TYPE = wchar_t;
144 #else
145  using NAME_CHAR_TYPE = char;
146 #endif
147  // Constructs a pool for running with with "degree_of_parallelism" threads with
148  // specified "name". env->StartThread() is used to create individual threads
149  // with the given ThreadOptions. If "low_latency_hint" is true the thread pool
150  // implementation may use it as a hint that lower latency is preferred at the
151  // cost of higher CPU usage, e.g. by letting one or more idle threads spin
152  // wait. Conversely, if the threadpool is used to schedule high-latency
153  // operations like I/O the hint should be set to false.
154  //
155  // REQUIRES: degree_of_parallelism > 0
156  ThreadPool(Env* env,
157  const ThreadOptions& thread_options,
158  const NAME_CHAR_TYPE* name,
159  int degree_of_parallelism,
160  bool low_latency_hint,
161  bool force_hybrid = false);
162 
163  // Waits until all scheduled work has finished and then destroy the
164  // set of threads.
165  ~ThreadPool();
166 
167  // Start and end a multi-loop parallel section. Parallel loops can
168  // be executed directly (without using this API), but entering a
169  // parallel section allows the runtime system to amortize loop
170  // entry/exit costs over multiple loops, and allows it to promote
171  // affinity between corresponding iterations of different loops.
172  //
173  // Multi-loop sections would typically be used in cases where a
174  // series of loops executes without much code in between them, and
175  // where it is impractical to refactor code into a single loop. For
176  // instance:
177  //
178  // {
179  // onnxruntime::concurrency::ThreadPoool::ParallelSection ps(tp);
180  // for (int x = 0; x < seq_len; x++) {
181  // TrySimpleParallelFor(tp, 16, [&]() { ... });
182  // }
183  // }
184  //
185  // The parallel section is entered via the constructor of
186  // ThreadPool::ParallelSection, and exited via the destructor.
187  // Currently, thread-local state is used to track whether or not the
188  // current thread is inside a parallel section. In contrast to
189  // handling parallel section objects explicitly in user code, this
190  // approach allows code such as MLAS to operate with/without the use
191  // of parallel sections.
192  //
193  // Parallel sections are only implemented with the Eigen threadpool.
194  // They have no effect when using OpenMP.
195  //
196  // Parallel sections may not be nested, and may not be used inside
197  // parallel loops.
198 
200  public:
201  explicit ParallelSection(ThreadPool *tp);
203 
204  private:
205  friend class ThreadPool;
206 
207  // Owning reference for the underlying ThreadPoolParallelSection
208  // which implements the thread management. We use an explicit
209  // deleter here so that the definition of
210  // ThreadPoolParallelSection does not need to be available at this
211  // point to avoid a dependence on the Eigen headers.
212  ThreadPoolParallelSection* ps_{nullptr};
213  ThreadPool *tp_;
215  };
216 
217  // The below API allows to disable spinning
218  // This is used to support real-time scenarios where
219  // spinning between relatively infrequent requests
220  // contributes to high CPU usage while not processing anything.
221  void EnableSpinning();
222 
223  void DisableSpinning();
224 
225  // Schedules fn() for execution in the pool of threads. The function may run
226  // synchronously if it cannot be enqueued. This will occur if the thread pool's
227  // degree-of-parallelism is 1, but it may also occur for implementation-dependent
228  // reasons such as if queues used for buffering work are full.
229  static void Schedule(ThreadPool* tp,
230  std::function<void()> fn) {
231  if (tp) {
232  tp->Schedule(fn);
233  } else {
234  fn();
235  }
236  }
237 
238  // ParallelFor shards the "total" units of work assuming each unit of work
239  // having roughly "cost_per_unit" cost, in cycles. Each unit of work is
240  // indexed 0, 1, ..., total - 1. Each shard contains 1 or more units of work
241  // and the total cost of each shard is roughly the same.
242  //
243  // "cost_per_unit" is an estimate of the number of CPU cycles (or nanoseconds
244  // if not CPU-bound) to complete a unit of work. Overestimating creates too
245  // many shards and CPU time will be dominated by per-shard overhead, such as
246  // Context creation. Underestimating may not fully make use of the specified
247  // parallelism, and may also cause inefficiencies due to load balancing
248  // issues and stragglers.
249 
250  static void TryParallelFor(ThreadPool* tp, std::ptrdiff_t total, double cost_per_unit,
251  const std::function<void(std::ptrdiff_t first, std::ptrdiff_t last)>& fn) {
252  TryParallelFor(tp, total, TensorOpCost{0, 0, static_cast<double>(cost_per_unit)}, fn);
253  }
254 
255  static void TryParallelFor(ThreadPool* tp, std::ptrdiff_t total, const TensorOpCost& cost_per_unit,
256  const std::function<void(std::ptrdiff_t first, std::ptrdiff_t last)>& fn);
257 
258  // Directly schedule the 'total' tasks to the underlying threadpool, without
259  // cutting them by halves
260 
261  inline static void TrySimpleParallelFor(ThreadPool* tp, std::ptrdiff_t total,
262  const std::function<void(std::ptrdiff_t)>& fn) {
263  if (tp != nullptr) {
264  tp->SimpleParallelFor(total, fn);
265  } else {
266  for (std::ptrdiff_t i = 0; i < total; ++i) {
267  // In many cases, fn can be inlined here.
268  fn(i);
269  }
270  }
271  }
272 
273  /**
274  * Tries to call the given function in parallel, with calls split into (num_batches) batches.
275  *\param num_batches If it is zero, it will be replaced to the value of DegreeOfParallelism().
276  *\param fn A std::function or STL style functor with signature of "void f(std::ptrdiff_t);"
277  * Pitfall: Caller should cap `num_batches` to a reasonable value based on the cost of `fn` and the value of `total`.
278  *For example, if fn is as simple as: int sum=0; fn = [&](int i){sum +=i;} and `total` is 100, then num_batches should
279  *be just 1.
280  *
281  * ```
282  **/
283  template <typename F>
284  inline static void TryBatchParallelFor(ThreadPool* tp, std::ptrdiff_t total, F&& fn, std::ptrdiff_t num_batches) {
285  if (tp == nullptr) {
286  for (std::ptrdiff_t i = 0; i < total; ++i) {
287  // In many cases, fn can be inlined here.
288  fn(i);
289  }
290  return;
291  }
292  if (total <= 0)
293  return;
294 
295  if (total == 1) {
296  fn(0);
297  return;
298  }
299 
300  if (num_batches <= 0) {
301  num_batches = std::min<std::ptrdiff_t>(total, DegreeOfParallelism(tp));
302  }
303 
304  if (num_batches <= 1) {
305  for (int i = 0; i < total; i++) {
306  fn(i);
307  }
308  return;
309  }
310 
311  tp->SimpleParallelFor(num_batches, [&](std::ptrdiff_t batch_index) {
312  auto work = PartitionWork(batch_index, num_batches, total);
313  for (std::ptrdiff_t i = work.start; i < work.end; i++) {
314  fn(i);
315  }
316  });
317  }
318 
319  struct WorkInfo {
320  std::ptrdiff_t start{0};
321  std::ptrdiff_t end{0};
322  };
323 
324  /** Calculate the start and end offsets for a batch.
325  @remarks Based on MlasPartitionWork
326  */
327  constexpr static WorkInfo PartitionWork(std::ptrdiff_t batch_idx, std::ptrdiff_t num_batches, std::ptrdiff_t total_work) {
328  const std::ptrdiff_t work_per_batch = total_work / num_batches;
329  const std::ptrdiff_t work_per_batch_extra = total_work % num_batches;
330 
331  WorkInfo info;
332  if (batch_idx < work_per_batch_extra) {
333  info.start = (work_per_batch + 1) * batch_idx;
334  info.end = info.start + work_per_batch + 1;
335  } else {
336  info.start = work_per_batch * batch_idx + work_per_batch_extra;
337  info.end = info.start + work_per_batch;
338  }
339 
340  return info;
341  }
342 
343  //......................................................................
344  //
345  // The following static methods take into account whether OpenMP is
346  // enabled/disabled, and if the thread pool pointer is nullptr
347  // during sequential execution.
348 
349  // Provide a hint to the caller for whether or not to parallelize
350  // work. This lets a caller switch to a sequential version of an
351  // algorithm rather than using calls via the ParallelFor functions.
352 
353  static bool ShouldParallelize(const ThreadPool* tp);
354 
355  // Return the degree of parallelism that code should assume when using the thread pool.
356  // It decouples the degree of parallelism for use with the thread pool from
357  // the implementation choice of whether this matches the number of threads created in
358  // the pool.
359  //
360  // Currently, a loop with degree-of-parallelism N is supported by a pool of N-1 threads
361  // working in combination with the thread initiating the loop.
362  static int DegreeOfParallelism(const ThreadPool* tp);
363 
365 
366  // StartProfiling and StopProfiling are not to be consumed as public-facing API
367  static void StartProfiling(concurrency::ThreadPool* tp);
368  static std::string StopProfiling(concurrency::ThreadPool* tp);
369 
370  private:
371  friend class LoopCounter;
372 
373  // Returns the number of threads created in the pool. This may be different from the
374  // value returned by DegreeOfParallelism to code using the pool.
375  int NumThreads() const;
376 
377  // Returns current thread id between 0 and NumThreads() - 1, if called from a
378  // thread in the pool. Returns -1 otherwise.
379  int CurrentThreadId() const;
380 
381  // Run fn with up to n degree-of-parallelism enlisting the thread pool for
382  // help. The degree-of-parallelism includes the caller, and so if n==1
383  // then the function will run directly in the caller. The fork-join
384  // synchronization is handled in the thread pool, and so any state captured
385  // by fn() is safe from concurrent access once RunWithHelp returns.
386  void RunInParallel(std::function<void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size);
387 
388  // Divides the work represented by the range [0, total) into k shards.
389  // Calls fn(i*block_size, (i+1)*block_size) from the ith shard (0 <= i < k).
390  // Each shard may be executed on a different thread in parallel, depending on
391  // the number of threads available in the pool.
392  // When (i+1)*block_size > total, fn(i*block_size, total) is called instead.
393  // Requires 0 < block_size <= total.
394  void ParallelForFixedBlockSizeScheduling(std::ptrdiff_t total, std::ptrdiff_t block_size,
395  const std::function<void(std::ptrdiff_t, std::ptrdiff_t)>& fn);
396 
397  // Return whether or not the calling thread should run a loop of
398  // num_iterations divided in chunks of block_size in parallel. If not,
399  // the caller should run the loop sequentially.
400  bool ShouldParallelizeLoop(const std::ptrdiff_t num_iterations,
401  const std::ptrdiff_t block_size = 1) const;
402 
403  // Internal (non-static) parallel loop methods. Unlike the public static methods,
404  // these will not handle the cases of OpenMP builds. or builds without a threadpool.
405  void ParallelFor(std::ptrdiff_t total, double cost_per_unit,
406  const std::function<void(std::ptrdiff_t first, std::ptrdiff_t last)>& fn);
407 
408  void ParallelFor(std::ptrdiff_t total, const TensorOpCost& cost_per_unit,
409  const std::function<void(std::ptrdiff_t first, std::ptrdiff_t)>& fn);
410 
411  void SimpleParallelFor(std::ptrdiff_t total, const std::function<void(std::ptrdiff_t)>& fn);
412 
413  void Schedule(std::function<void()> fn);
414 
415  void StartProfiling();
416 
418 
419  ThreadOptions thread_options_;
420 
421  // If a thread pool is created with degree_of_parallelism != 1 then an underlying
422  // EigenThreadPool is used to create OS threads and handle work distribution to them.
423  // If degree_of_parallelism == 1 then underlying_threadpool_ is left as nullptr
424  // and parallel work is run directly by the caller.
425  ExtendedThreadPoolInterface* underlying_threadpool_ = nullptr;
426 
427  // If used, underlying_threadpool_ is instantiated and owned by the ThreadPool.
428  std::unique_ptr<ThreadPoolTempl<Env> > extended_eigen_threadpool_;
429 
430  // Force the thread pool to run in hybrid mode on a normal cpu.
431  bool force_hybrid_ = false;
432 };
433 
434 } // namespace concurrency
435 } // namespace onnxruntime
GLint first
Definition: glcorearb.h:405
GLuint start
Definition: glcorearb.h:475
GLsizei const GLchar *const * string
Definition: glcorearb.h:814
static void TrySimpleParallelFor(ThreadPool *tp, std::ptrdiff_t total, const std::function< void(std::ptrdiff_t)> &fn)
Definition: threadpool.h:261
static void Schedule(ThreadPool *tp, std::function< void()> fn)
Definition: threadpool.h:229
static void TryBatchParallelFor(ThreadPool *tp, std::ptrdiff_t total, F &&fn, std::ptrdiff_t num_batches)
Definition: threadpool.h:284
ThreadPool(Env *env, const ThreadOptions &thread_options, const NAME_CHAR_TYPE *name, int degree_of_parallelism, bool low_latency_hint, bool force_hybrid=false)
static bool ShouldParallelize(const ThreadPool *tp)
static void TryParallelFor(ThreadPool *tp, std::ptrdiff_t total, double cost_per_unit, const std::function< void(std::ptrdiff_t first, std::ptrdiff_t last)> &fn)
Definition: threadpool.h:250
GLdouble n
Definition: glcorearb.h:2008
GLuint GLuint end
Definition: glcorearb.h:475
#define ORT_DISALLOW_COPY_ASSIGNMENT_AND_MOVE(TypeName)
Definition: common.h:220
GLuint const GLchar * name
Definition: glcorearb.h:786
static int DegreeOfParallelism(const ThreadPool *tp)
static std::string StopProfiling(concurrency::ThreadPool *tp)
static void StartProfiling(concurrency::ThreadPool *tp)
static constexpr WorkInfo PartitionWork(std::ptrdiff_t batch_idx, std::ptrdiff_t num_batches, std::ptrdiff_t total_work)
Definition: threadpool.h:327