HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
EigenNonBlockingThreadPool.h
Go to the documentation of this file.
1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 /* Modifications Copyright (c) Microsoft. */
11 
12 #include <type_traits>
13 
14 #pragma once
15 #include "onnxruntime_config.h"
16 // build/external/eigen/unsupported/Eigen/CXX11/src/Tensor/TensorEvaluator.h:162:71:
17 // error: ignoring attributes on template argument "Eigen::PacketType<const float, Eigen::DefaultDevice>::type {aka
18 // __vector(4) float}" [-Werror=ignored-attributes]
19 #if defined(__GNUC__)
20 #pragma GCC diagnostic push
21 #pragma GCC diagnostic ignored "-Wunused-parameter"
22 #pragma GCC diagnostic ignored "-Wunused-result"
23 // cmake/external/eigen/unsupported/Eigen/CXX11/../../../Eigen/src/Core/arch/NEON/PacketMath.h:1633:9:
24 // error: ‘void* memcpy(void*, const void*, size_t)’ copying an object of non-trivial type ‘Eigen::internal::Packet4c’
25 // {aka ‘struct Eigen::internal::eigen_packet_wrapper<int, 2>’} from an array of ‘const int8_t’
26 // {aka ‘const signed char’} [-Werror=class-memaccess]
27 #ifdef HAS_CLASS_MEMACCESS
28 #pragma GCC diagnostic ignored "-Wclass-memaccess"
29 #endif
30 #elif defined(_MSC_VER)
31 #pragma warning(push)
32 #pragma warning(disable : 4127)
33 #pragma warning(disable : 4805)
34 #endif
35 #include <memory>
36 #include "unsupported/Eigen/CXX11/ThreadPool"
37 
38 #if defined(__GNUC__)
39 #pragma GCC diagnostic pop
40 #elif defined(_MSC_VER)
41 #pragma warning(pop)
42 #endif
43 #include "core/common/denormal.h"
45 #include "core/common/spin_pause.h"
47 #include "core/platform/Barrier.h"
48 
49 // ORT thread pool overview
50 // ------------------------
51 //
52 // The ORT thread pool implementation is split into two layers. This
53 // file provides the low-level component. See the accompanying
54 // comments in threadpool.h for the high-level component.
55 //
56 // The code here is derived from the Eigen non-blocking thread pool,
57 // although many parts have been updated over time. The main
58 // abstractions used here are:
59 //
60 // - The thread pool maintains a set of OS threads running
61 // ThreadPoolTempl::WorkerLoop.
62 //
63 // Each thread has its own RunQueue object, holding a queue of tasks
64 // that have been pushed to the thread for execution. The main work
65 // loop is to pop a task from the head of the queue, and to execute
66 // it to completion. If the worker's run queue is empty then it
67 // will spin waiting for work, then attempt to steal tasks from
68 // other threads' queues, and then block in the OS if it cannot find
69 // work.
70 //
71 // This spin-then-block behavior is configured via a flag provided
72 // when creating the thread pool, and by the constant spin_count.
73 //
74 // - Although all tasks are simple void()->void functions,
75 // conceptually there are three different kinds:
76 //
77 // - One-shot tasks submitted externally via the Schedule() method.
78 // These tasks are used to support asynchronous work. These are
79 // used in the parallel executor, but otherwise are not widely
80 // used outside of test harnesses (see threadpool_test.cc for some
81 // examples).
82 //
83 // - Tasks for running a parallel loop.
84 //
85 // The tasks themselves are defined in threadpool.cc, and are
86 // submitted to the run queues via RunInParallel->SummonWorkers.
87 // Each task will loop internally, picking off iterations from the
88 // user's code via atoic-fetch-and-add, until the loop is
89 // complete.
90 //
91 // This two-layer approach lets us separate out the
92 // super-lightweight per-iteration-batch work from the more
93 // costly per-loop work of managing Task objects.
94 //
95 // - Tasks for running a parallel section. This is an extension of
96 // the approach taken for parallel loops. However, the Tasks are
97 // defined in this file, and can pick up iterations from a series
98 // of different parallel loops. The tasks are defined in
99 // RunInParallelSection->SummonWorkers.
100 //
101 // The additional layer of parallel sections is a further way to
102 // amortize costs: the work done creating the tasks can be
103 // performed once, and then exploited over a series of loops.
104 //
105 // There are a few aspects of the modified Eigen thread pool to
106 // highlight:
107 //
108 // - The run queues follow the usual approach of having push/pop
109 // operations on the front/back, and optimizing the PopFront case
110 // for single-threaded use by the thread owning the run queue.
111 // Two points to note here are:
112 //
113 // * We should experiment with simplifying these queues. In ORT, we
114 // use the CAS-based scheduling layer in threadpool.cc for the
115 // fine-grained allocation of individual loop iterations to worker
116 // threads. This means we do not have the form of recursive
117 // sub-division of work that motivates the original design.
118 //
119 // * We support an additional Revoke operation to replace an item in
120 // the middle of a queue with a tombstone. This operation is used
121 // at the end of parallel loops and parallel sections to remove
122 // any tasks that were created but not yet executed. Once
123 // revoked, a thread can rely on the fact that the task will no
124 // longer execute. Revocation helps manage captured state in
125 // parallel loops: the alternatives would be (i) waiting for all
126 // tasks that captured state to reach the head of their queues and
127 // execute, or (ii) use heap-allocated state in tasks, and use a
128 // technique such as reference counting to de-allocate it.
129 //
130 // To support revocation, each thread has a unique "Tag" to
131 // identify the items that it adds to the work queues. A thread
132 // can revoke an item only if it has the thread's own tag.
133 //
134 // - When entering a parallel loop (or parallel section), a thread
135 // maintains a set of "preferred" worker hints, and initially
136 // submits tasks to these workers.
137 // When a task executes, it updates the submitting thread's
138 // preferred workers to reflect the worker that the task ran on.
139 // Hence, if a task is submitted to thread T1's queue, and then
140 // stolen by T2 for execution, then T2 will become preferred.
141 //
142 // This "stickiness" aims to retain locality between successive
143 // loops submitted by the same thread, to maintain the same set of
144 // active threads over time (when the entire pool is not needed),
145 // and to allow concurrent requests to submit works to their own
146 // respective sets of preferred workers.
147 
148 namespace onnxruntime {
149 namespace concurrency {
150 
151 #ifdef _WIN32
152 using CHAR_TYPE = wchar_t;
153 #else
154 using CHAR_TYPE = char;
155 #endif
156 
158 class ThreadPoolLoop;
159 
160 enum class StealAttemptKind {
161  TRY_ONE,
162  TRY_ALL,
163 };
164 
165 enum class PushResult {
166  REJECTED,
169 };
170 
171 // Align to avoid false sharing with prior fields. If required,
172 // alignment or padding must be added subsequently to avoid false
173 // sharing with later fields. Note that:
174 //
175 // - The __x86_64__ value is twice the line size (64 bytes). This
176 // accounts for 2-line prefetch behavior on some cores.
177 //
178 // - Ideally, ORT_ALIGN_TO_AVOID_FALSE_SHARING is used. However, the
179 // definition of ThreadPoolParallelSection uses naive padding
180 // because C++11 does not support alignment constraints on
181 // allocation or expose stdlib.h aligned_alloc. C++17 introduces
182 // support for aligned allocation which we could use here.
183 
184 #if defined(__x86_64__)
185 #define ORT_FALSE_SHARING_BYTES 128
186 #else
187 #define ORT_FALSE_SHARING_BYTES 64
188 #endif
189 
190 #define ORT_ALIGN_TO_AVOID_FALSE_SHARING alignas(ORT_FALSE_SHARING_BYTES)
191 
194 };
195 
196 /* Usage:
197 1. In executor, call Start() before profiling and Stop() to get profiled numbers;
198 2. Inside thread pool, call LogStart() before interested section and LogEnd... after to log elapsed time;
199 3. To extend, just add more events in enum Event before "All", and update GetEventName(...) accordingly;
200 4. Note LogStart must pair with either LogEnd or LogEndAndStart, otherwise ORT_ENFORCE will fail;
201 5. ThreadPoolProfiler is thread-safe.
202 */
203 #ifdef ORT_MINIMAL_BUILD
204 class ThreadPoolProfiler {
205  public:
206  enum ThreadPoolEvent {
207  DISTRIBUTION = 0,
209  RUN,
210  WAIT,
211  WAIT_REVOKE,
212  MAX_EVENT
213  };
214  ThreadPoolProfiler(int, const CHAR_TYPE*){};
215  ~ThreadPoolProfiler() = default;
217  void Start(){};
218  std::string Stop() { return "not available for minimal build"; }
219  void LogStart(){};
220  void LogEnd(ThreadPoolEvent){};
222  void LogStartAndCoreAndBlock(std::ptrdiff_t){};
223  void LogCoreAndBlock(std::ptrdiff_t){};
224  void LogThreadId(int){};
225  void LogRun(int){};
226  std::string DumpChildThreadStat() { return {}; }
227 };
228 #else
230  public:
238  };
239  ThreadPoolProfiler(int num_threads, const CHAR_TYPE* threal_pool_name);
242  using Clock = std::chrono::high_resolution_clock;
243  void Start(); //called by executor to start profiling
244  std::string Stop(); //called by executor to stop profiling and return collected numbers
245  void LogStart(); //called in main thread to record the starting time point
246  void LogEnd(ThreadPoolEvent); //called in main thread to calculate and save the time elapsed from last start point
248  void LogStartAndCoreAndBlock(std::ptrdiff_t block_size);
249  void LogCoreAndBlock(std::ptrdiff_t block_size); //called in main thread to log core and block size for task breakdown
250  void LogThreadId(int thread_idx); //called in child thread to log its id
251  void LogRun(int thread_idx); //called in child thread to log num of run
252  std::string DumpChildThreadStat(); //return all child statitics collected so far
253 
254  private:
255  static const char* GetEventName(ThreadPoolEvent);
256  struct MainThreadStat {
257  uint64_t events_[MAX_EVENT] = {};
258  int32_t core_ = -1;
259  std::vector<std::ptrdiff_t> blocks_; //block size determined by cost model
260  std::vector<onnxruntime::TimePoint> points_;
261  void LogCore();
262  void LogBlockSize(std::ptrdiff_t block_size);
263  void LogStart();
264  void LogEnd(ThreadPoolEvent);
266  std::string Reset();
267  };
268  bool enabled_ = false;
269  MainThreadStat& GetMainThreadStat(); //return thread local stat
270  int num_threads_;
271 #ifdef _MSC_VER
272 #pragma warning(push)
273 // C4324: structure was padded due to alignment specifier
274 #pragma warning(disable : 4324)
275 #endif // _MSC_VER
276  struct ORT_ALIGN_TO_AVOID_FALSE_SHARING ChildThreadStat {
277  std::thread::id thread_id_;
278  uint64_t num_run_ = 0;
279  onnxruntime::TimePoint last_logged_point_ = Clock::now();
280  int32_t core_ = -1; //core that the child thread is running on
281  };
282 #ifdef _MSC_VER
283 #pragma warning(pop)
284 #endif // _MSC_VER
285  std::vector<ChildThreadStat> child_thread_stats_;
286  std::string thread_pool_name_;
287 };
288 #endif
289 
290 // Extended Eigen thread pool interface, avoiding the need to modify
291 // the ThreadPoolInterface.h header from the external Eigen
292 // repository.
293 
294 class ExtendedThreadPoolInterface : public Eigen::ThreadPoolInterface {
295  public:
296  // Start/end a parallel section, within which calls to
297  // RunInParallelSection may be made. Parallel sections are
298  // non-nesting.
299  virtual void StartParallelSection(ThreadPoolParallelSection& ps) = 0;
300  virtual void EndParallelSection(ThreadPoolParallelSection& ps) = 0;
301 
302  // Run fn with up to n degree-of-parallelism enlisting the thread
303  // pool for help. The degree-of-parallelism includes the caller,
304  // and so if n==1 then the function will run directly in the caller.
305  //
306  // The fork-join synchronization is handled in the thread pool, and
307  // so any state captured by fn() is safe from concurrent access once
308  // RunInParallelSection returns.
309  //
310  // The parameter idx provides a loop-local thread ID in the range
311  // [0,k) where k<=n.
313  std::function<void(unsigned idx)> fn,
314  unsigned n, std::ptrdiff_t block_size) = 0;
315 
316  // Special case alternative to RunInParallelSection for use without
317  // an existing parallel section. Ideally we would use a single
318  // implementation and a stack-allocated ThreadPoolParallelSection.
319  //
320  // However, on the BM_ThreadPoolParallelFor micro-benchmark I saw
321  // ~20% overhead on the resulting single-loop parallel sections.
322  // There are some additional costs (~5%) for additional invocations
323  // through lambda functions on loop entry. Most significantly, on
324  // loop exit, we incurred ~15% cost by no longer being able to
325  // overlap clean-up of unused Task objects in EndParallelSection
326  // with waiting for loop iterations to complete.
327  //
328  // [ Note that this 20% overhead is more than paid for when we have
329  // two loops execute in series in a parallel section. ]
330  virtual void RunInParallel(std::function<void(unsigned idx)> fn,
331  unsigned n, std::ptrdiff_t block_size) = 0;
332  virtual void StartProfiling() = 0;
333  virtual std::string StopProfiling() = 0;
334 };
335 
337  public:
338  // State accessed only by the main thread
339  // --------------------------------------
340 
341  // Tasks successfully submitted to the work queues. This sets the
342  // maximum degree of parallelism that the section will support.
344 
345  // Number of tasks revoked (i.e., removed from the queues prior to
346  // execution). We count this at various points, and omit waiting
347  // for them at the end of a loop.
348  unsigned tasks_revoked{0};
349 
350  // Current degree of parallelism, including work in the main thread
351  // and in the dispatcher.
352  unsigned current_dop{0};
353 
354  // State shared between the main thread and worker threads
355  // -------------------------------------------------------
356 
357  // Flag to signal termination of the parallel section
358  std::atomic<bool> active{false};
359 
360  // Count of the number of tasks that completed normally. Other
361  // tasks may be running currently, or may be present in work queues,
362  // or may have been removed from the queues by
363  // RunQueue::RevokeWithTag.
365  std::atomic<unsigned> tasks_finished{0};
367 
368  // If non-null, the current loop that tasks should be executing. We
369  // need to be careful on access to the contents of current_loop
370  // because it can be stack allocated on the thread entering the
371  // loop:
372  //
373  // - Readers increment workers_in_loop and then read current_loop
374  //
375  // - Writers wishing to deallocate *current_loop must first clear
376  // current_loop and then wait for workers_in_loop==0
377  std::atomic<ThreadPoolLoop*> current_loop{nullptr};
378  std::atomic<unsigned> workers_in_loop{0};
379 
380  // Members to track asynchronous dispatching
381  int dispatch_q_idx = -1; // index of thread that dispatch work to all other threads
382  unsigned dispatch_w_idx = 0; // index of enqueued work
383  std::atomic<bool> dispatch_started{false};
384  std::atomic<bool> dispatch_done{false};
385  std::atomic<bool> work_done{false};
386 };
387 
389  public:
390  ThreadPoolLoop(std::function<void(unsigned)> f, unsigned t) : fn(std::move(f)), threads_needed(t) {
391  }
392 
393  const std::function<void(unsigned)> fn;
394  const unsigned threads_needed;
395 
396  private:
397  ORT_DISALLOW_COPY_ASSIGNMENT_AND_MOVE(ThreadPoolLoop);
398 };
399 
400 template <typename Work, typename Tag, unsigned kSize>
401 class RunQueue {
402  public:
403  RunQueue() : front_(0), back_(0) {
404  // require power-of-two for fast masking
405  assert((kSize & (kSize - 1)) == 0);
406  assert(kSize > 2); // why would you do this?
407  assert(kSize <= (64 << 10)); // leave enough space for counter
408  for (unsigned i = 0; i < kSize; i++) array_[i].state.store(ElemState::kEmpty, std::memory_order_relaxed);
409  }
410 
412  assert(Size() == 0);
413  }
414 
415  // PopFront removes and returns the first element in the queue.
416  // If the queue was empty returns default-constructed Work.
417  Work PopFront() {
418  unsigned front;
419  Elem* e;
420  ElemState s;
421 
422  // Drain revoked items from the front of the queue. CAS to busy to synchronize with
423  // any attempt to take the same item from the back of the queue.
424  do {
425  front = front_.load(std::memory_order_relaxed);
426  e = &array_[(front - 1) & kMask];
427  s = e->state.load(std::memory_order_relaxed);
428  if (s == ElemState::kRevoked &&
429  e->state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire)) {
430  e->state.store(ElemState::kEmpty, std::memory_order_release);
431  front = ((front - 1) & kMask2) | (front & ~kMask2);
432  front_.store(front, std::memory_order_relaxed);
433  }
434  } while (s == ElemState::kRevoked);
435 
436  // Attempt to take next item. State kEmpty shows the queue is empty, kBusy shows
437  // the work is in progress on the item at the front of the queue.
438  if (s != ElemState::kReady ||
439  !e->state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire))
440  return Work();
441  Work w = std::move(e->w);
442  e->tag = Tag();
443  e->state.store(ElemState::kEmpty, std::memory_order_release);
444  front = ((front - 1) & kMask2) | (front & ~kMask2);
445  front_.store(front, std::memory_order_relaxed);
446  return w;
447  }
448 
449  // PushBack adds w at the end of the queue.
450  // If queue is full returns w, otherwise returns default-constructed Work.
451  Work PushBack(Work w) {
452  std::lock_guard<OrtMutex> lock(mutex_);
453  unsigned back = back_.load(std::memory_order_relaxed);
454  Elem& e = array_[(back - 1) & kMask];
455  ElemState s = e.state.load(std::memory_order_relaxed);
456  if (s != ElemState::kEmpty ||
457  !e.state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire))
458  return w;
459  back = ((back - 1) & kMask2) | (back & ~kMask2);
460  back_.store(back, std::memory_order_relaxed);
461  e.w = std::move(w);
462  e.tag = Tag();
463  e.state.store(ElemState::kReady, std::memory_order_release);
464  return Work();
465  }
466 
467  // PushBackWithTag adds w at the end of the queue. The tag value can be used on a
468  // subsequent call to RevokeWithTag to remove the item from the queue in combination
469  // with w_idx. Typically the tag will be a per-thread ID to distinguish work
470  // submitted from different threads.
471  PushResult PushBackWithTag(Work w, Tag tag, unsigned& w_idx) {
472  std::lock_guard<OrtMutex> lock(mutex_);
473  unsigned back = back_.load(std::memory_order_relaxed);
474  w_idx = (back - 1) & kMask;
475  Elem& e = array_[w_idx];
476  ElemState s = e.state.load(std::memory_order_relaxed);
477  if (s != ElemState::kEmpty ||
478  !e.state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire))
479  return PushResult::REJECTED; /* Not enqueued */
480  bool was_ready = (((back ^ (front_.load(std::memory_order_relaxed))) & kMask) == 0);
481  back = ((back - 1) & kMask2) | (back & ~kMask2);
482  back_.store(back, std::memory_order_relaxed);
483  e.w = std::move(w);
484  e.tag = tag;
485  e.state.store(ElemState::kReady, std::memory_order_release);
486  return was_ready ? PushResult::ACCEPTED_IDLE : PushResult::ACCEPTED_BUSY; /* Enqueued */
487  }
488 
489  // PopBack removes and returns the last elements in the queue.
490  Work PopBack() {
491  if (Empty())
492  return Work();
493  std::lock_guard<OrtMutex> lock(mutex_);
494  unsigned back;
495  Elem* e;
496  ElemState s;
497 
498  // Drain revoked items from the back of the queue. CAS to busy to synchronize with
499  // any attempt to take the same item from the front of the queue.
500  do {
501  back = back_.load(std::memory_order_relaxed);
502  e = &array_[back & kMask];
503  s = e->state.load(std::memory_order_relaxed);
504  if (s == ElemState::kRevoked &&
505  e->state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire)) {
506  e->state.store(ElemState::kEmpty, std::memory_order_release);
507  back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
508  }
509  } while (s == ElemState::kRevoked);
510 
511  if (s != ElemState::kReady ||
512  !e->state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire))
513  return Work();
514  Work w = std::move(e->w);
515  e->tag = Tag();
516  e->state.store(ElemState::kEmpty, std::memory_order_release);
517  back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
518  return w;
519  }
520 
521  // RevokeItem removes a work item from the queue. Items are identified positionally,
522  // and so a tag is used to detect whether the same position is occupied by a
523  // different work item at the time of removal. RevokeWithTags lets threads offer work
524  // for parallel execution, and then revoke the offer prior to the work executing (for
525  // instance if the thread itself completes all of the work). Revoking the work
526  // lets the thread deallocate state that might otherwise have been captured by the work item
527  // and accessed by it.
528  //
529  // Return true iff the item is successfully revoked. If the item is not revoked then
530  // the caller must assume that it may still execute, for instance because it
531  // has been pop'd from the queue concurrent with the revocation request.
532 
533  bool RevokeWithTag(Tag tag, unsigned w_idx) {
534  bool revoked = false;
535  std::lock_guard<OrtMutex> lock(mutex_);
536  Elem& e = array_[w_idx];
537  ElemState s = e.state.load(std::memory_order_relaxed);
538 
539  // We have acquired a lock on the queue, synchronizing with
540  // operations aside from the PopFront fast-path. Synchronize with
541  // that by attempting the same kReady->kBusy transition via CAS.
542 
543  if (s == ElemState::kReady &&
544  e.state.compare_exchange_strong(s, ElemState::kBusy, std::memory_order_acquire)) {
545  if (e.tag == tag) {
546  unsigned back = back_.load(std::memory_order_relaxed);
547  unsigned back_idx = back & kMask;
548  if (back_idx != w_idx) {
549  // Item is not at the back of the queue, mark it in-place as revoked
550  e.tag = Tag();
551  e.w = Work();
552  e.state.store(ElemState::kRevoked, std::memory_order_release);
553  revoked = true;
554  } else {
555  // Item being removed as still at the back; shift the back pointer over it,
556  // and bump the version number.
557  e.tag = Tag();
558  e.w = Work();
559  e.state.store(ElemState::kEmpty, std::memory_order_release);
560  back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
561  revoked = true;
562  }
563  } else {
564  // Tag mismatch, i.e. work queue slot re-used
565  e.state.store(ElemState::kReady, std::memory_order_release);
566  }
567  }
568  return revoked;
569  }
570 
571  // Size returns current queue size.
572  // Can be called by any thread at any time.
573  unsigned Size() const {
574  return SizeOrNotEmpty<true>();
575  }
576 
577  // Empty tests whether container is empty.
578  // Can be called by any thread at any time.
579  bool Empty() const {
580  return SizeOrNotEmpty<false>() == 0;
581  }
582 
583  private:
584  static const unsigned kMask = kSize - 1;
585  static const unsigned kMask2 = (kSize << 1) - 1;
586 
587  enum class ElemState : uint8_t {
588  kEmpty,
589  kBusy,
590  kReady,
591  kRevoked,
592  };
593 
594  // Updates to an element are bracketed by a std::memory_order_acquire
595  // load from the state, and a std::memory_order_release store. Accesses
596  // to the front/back indices for the work queue use relaxed semantics,
597  // with the state of the elements being authoritative.
598  //
599  // TODO: Revisit whether there is a significant benefit for the current
600  // workloads in the complexity here.
601  struct Elem {
602  std::atomic<ElemState> state;
603  Tag tag;
604  Work w;
605  };
606 
607  OrtMutex mutex_;
608 
609  // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
610  // front/back, respectively. The remaining bits contain modification counters
611  // that are incremented on Push operations. This allows us to (1) distinguish
612  // between empty and full conditions (if we would use log(kSize) bits for
613  // position, these conditions would be indistinguishable); (2) obtain
614  // consistent snapshot of front_/back_ for Size operation using the
615  // modification counters.
616  ORT_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned> front_;
617  ORT_ALIGN_TO_AVOID_FALSE_SHARING std::atomic<unsigned> back_;
618  ORT_ALIGN_TO_AVOID_FALSE_SHARING Elem array_[kSize];
619 
620  // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
621  // only whether the size is 0 is guaranteed to be correct.
622  // Can be called by any thread at any time.
623  template <bool NeedSizeEstimate>
624  unsigned SizeOrNotEmpty() const {
625  // Emptiness plays critical role in thread pool blocking. So we go to great
626  // effort to not produce false positives (claim non-empty queue as empty).
627  unsigned front = front_.load(std::memory_order_acquire);
628  for (;;) {
629  // Capture a consistent snapshot of front/tail.
630  unsigned back = back_.load(std::memory_order_acquire);
631  unsigned front1 = front_.load(std::memory_order_relaxed);
632  if (front != front1) {
633  front = front1;
634  std::atomic_thread_fence(std::memory_order_acquire);
635  continue;
636  }
637  if (NeedSizeEstimate) {
638  return CalculateSize(front, back);
639  }
640  // This value will be 0 if the queue is empty, and undefined otherwise.
641  unsigned maybe_zero = ((front ^ back) & kMask2);
642  // Queue size estimate must agree with maybe zero check on the queue
643  // empty/non-empty state.
644  eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
645  return maybe_zero;
646  }
647  }
648 
649  EIGEN_ALWAYS_INLINE
650  unsigned CalculateSize(unsigned front, unsigned back) const {
651  int size = (front & kMask2) - (back & kMask2);
652  // Fix overflow.
653  if (size < 0)
654  size += 2 * kSize;
655  // Order of modification in push/pop is crafted to make the queue look
656  // larger than it is during concurrent modifications. E.g. push can
657  // increment size before the corresponding pop has decremented it.
658  // So the computed size can be up to kSize + 1, fix it.
659  if (size > static_cast<int>(kSize))
660  size = kSize;
661  return static_cast<unsigned>(size);
662  }
663 
664  RunQueue(const RunQueue&) = delete;
665  void operator=(const RunQueue&) = delete;
666 };
667 
668 static std::atomic<uint32_t> next_tag{1};
669 
670 template <typename Environment>
672  private:
673  struct PerThread;
674 
675  static unsigned WorkerLoop(int id, Eigen::ThreadPoolInterface* param) {
676  // unsafe downcast
677  ThreadPoolTempl* this_ptr = (ThreadPoolTempl*)param;
678  this_ptr->WorkerLoop(id);
679  return 0;
680  }
681 
682  ThreadPoolProfiler profiler_;
683 
684  void SignalAllAndWait() {
685  done_ = true;
686 
687  // Now if all threads block without work, they will start exiting.
688  // But note that threads can continue to work arbitrary long,
689  // block, submit new work, unblock and otherwise live full life.
690  WakeAllWorkersForExit();
691  // Join threads explicitly (by destroying) to avoid destruction order within
692  // this class.
693  for (size_t i = 0; i < worker_data_.size(); ++i) worker_data_[i].thread.reset();
694  }
695 
696  public:
697  void StartProfiling() override {
698  profiler_.Start();
699  }
700 
702  return profiler_.Stop();
703  }
704 
705  struct Tag {
706  constexpr Tag() : v_(0) {
707  }
708 
709  Tag(uint32_t v) : v_(v) {
710  }
711 
712  // Allocate a new tag to use to identify work items from a given
713  // thread in a parallel section. Ideally, threads will have
714  // unique tags, but re-use is not incorrect if the counter wraps
715  // (for intsance, if a long-running workload is calling into ORT
716  // from a fresh thread for each request). We must not re-use the
717  // default tag 0 which is used to identify work items added via
718  // Schedule as opposed to requests for help in parallel sections.
719 
720  static Tag GetNext() {
721  Tag t = Tag(next_tag++);
722  if (t.v_ == 0) {
723  t = Tag(next_tag++);
724  }
725  return t;
726  }
727 
728  uint32_t Get() const {
729  return v_;
730  }
731 
732  bool operator==(Tag& other) const {
733  return v_ == other.v_;
734  }
735 
736  uint32_t v_ = 0;
737  };
738 
739  typedef std::function<void()> Task;
741 
742  ThreadPoolTempl(const CHAR_TYPE* name, int num_threads, bool allow_spinning, Environment& env,
743  const ThreadOptions& thread_options)
744  : profiler_(num_threads, name),
745  env_(env),
746  num_threads_(num_threads),
747  allow_spinning_(allow_spinning),
748  set_denormal_as_zero_(thread_options.set_denormal_as_zero),
749  worker_data_(num_threads),
750  all_coprimes_(num_threads),
751  blocked_(0),
752  done_(false) {
753  // Calculate coprimes of all numbers [1, num_threads].
754  // Coprimes are used for random walks over all threads in Steal
755  // and NonEmptyQueueIndex. Iteration is based on the fact that if we take
756  // a random starting thread index t and calculate num_threads - 1 subsequent
757  // indices as (t + coprime) % num_threads, we will cover all threads without
758  // repetitions (effectively getting a presudo-random permutation of thread
759  // indices).
760  for (auto i = 1u; i <= num_threads_; ++i) {
761  all_coprimes_.emplace_back(i);
762  ComputeCoprimes(i, &all_coprimes_.back());
763  }
764 
765  // Eigen::MaxSizeVector has neither essential exception safety features
766  // such as swap, nor it is movable. So we have to join threads right here
767  // on exception
768  ORT_TRY {
769  worker_data_.resize(num_threads_);
770  for (auto i = 0u; i < num_threads_; i++) {
771  worker_data_[i].thread.reset(env_.CreateThread(name, i, WorkerLoop, this, thread_options));
772  }
773  } ORT_CATCH(...) {
774  ORT_HANDLE_EXCEPTION([&]() {
775  SignalAllAndWait();
776  throw;
777  });
778  }
779  }
780 
781  ~ThreadPoolTempl() override {
782  SignalAllAndWait();
783  }
784 
785  // Run fn(). Ordinarily, the function will be added to the thread pool and executed
786  // by a worker thread. If the thread pool rejects the work then fn() will instead
787  // execute synchronously during Schedule(fn). Currently the thread pool will only
788  // reject work if the queue of pending work is full.
789 
790  void Schedule(std::function<void()> fn) override {
791  PerThread* pt = GetPerThread();
792  int q_idx = Rand(&pt->rand) % num_threads_;
793  WorkerData& td = worker_data_[q_idx];
794  Queue& q = td.queue;
795  fn = q.PushBack(std::move(fn));
796  if (!fn) {
797  // The queue accepted the work; ensure that the thread will pick it up
798  td.EnsureAwake();
799  } else {
800  // Run the work directly if the queue rejected the work
801  fn();
802  }
803  }
804 
805  //......................................................................
806  //
807  // Parallel sections
808  // -----------------
809  //
810 
811  // Start a parallel section, using a caller-provided
812  // ThreadPoolParallelSection for maintaining the per-section state.
813  // Starting a parallel section is just book-keeping; threads are
814  // "summoned" to help with the parallel section once it enters
815  // parallel loops. The threads are then retained until the end of the
816  // section, being re-used over subsequent loops.
817 
818  void StartParallelSectionInternal(PerThread& pt,
820  assert((!pt.leading_par_section) && "Nested parallelism not supported");
821  assert((!ps.active) && "Starting parallel section, but active already");
822  pt.leading_par_section = true;
823  if (!pt.tag.Get()) {
824  pt.tag = Tag::GetNext();
825  }
826  ps.dispatch_q_idx = -1;
827  ps.dispatch_started = false;
828  ps.dispatch_done = false;
829  ps.work_done = false;
830  ps.tasks_revoked = 0;
831  ps.current_dop = 1;
832  ps.active = true;
833  }
834 
836  PerThread* pt = GetPerThread();
838  }
839 
840  // End a parallel section, waiting for all worker threads to exit from
841  // section. Hence, on return, the ThreadPoolParallelSection object
842  // can be dealloacted.
843  void EndParallelSectionInternal(PerThread& pt,
845  assert((pt.leading_par_section) && "Ending parallel section, but none started");
846  assert((ps.active) && "Ending parallel section, but not active");
847  pt.leading_par_section = false;
848 
849  // Notify workers to exit from the section
850  ps.active = false;
851 
852  // First, attempt to revoke the dispatch task. If we succeed then
853  // we know we revoked _something_ pushed for the current loop. That
854  // may be the dispatch task itself, or it may be a task pushed by
855  // the dispatch task. Those cases are distinguished by whether or
856  // not the dispatch task itself has started -- if it has not started
857  // then it cannot have pushed tasks.
858  if (ps.dispatch_q_idx != -1) {
859  Queue& q = worker_data_[ps.dispatch_q_idx].queue;
860  if (q.RevokeWithTag(pt.tag, ps.dispatch_w_idx)) {
861  if (!ps.dispatch_started.load(std::memory_order_acquire)) {
862  // We successfully revoked a task, and saw the dispatch task
863  // not started. Hence we know we revoked the dispatch task.
864  // This should be the common case.
865  ps.dispatch_q_idx = -1;
866  } else {
867  // We successfully revoked a task, but saw the dispatch task
868  // had started. Hence we know we revoked one of the _new_
869  // tasks created by the dispatcher (not the dispatcher
870  // itself). This should be the rare case, but can occur if
871  // one of the tasks created by the dispatcher occupies the
872  // exact same slot in a work queue that the dispatcher used.
873  ps.tasks_revoked++;
874  }
875  }
876  }
877 
878  // Second, if we failed to revoke the dispatch task, wait for it to
879  // finish dispatch work. This avoids new tasks being started
880  // concurrently with us attempting to end the parallel section.
881  if (ps.dispatch_q_idx != -1) {
882  while (!ps.dispatch_done.load(std::memory_order_acquire)) {
884  }
885  }
886 
887  // Now we know that dispatch is finshed, we synchronize with the
888  // tasks that were created (if any) for the parallel section. We
889  // revoke tasks still in queues, and then wait for any that are
890  // still running.
891  profiler_.LogStart();
892  unsigned tasks_started = static_cast<unsigned>(ps.tasks.size());
893  while (!ps.tasks.empty()) {
894  const auto& item = ps.tasks.back();
895  Queue& q = worker_data_[item.first].queue;
896  if (q.RevokeWithTag(pt.tag, item.second)) {
897  ps.tasks_revoked++;
898  }
899  ps.tasks.pop_back();
900  }
902 
903  // Wait for the dispatch task's own work...
904  if (ps.dispatch_q_idx > -1) {
905  while (!ps.work_done.load(std::memory_order_acquire)) {
907  }
908  }
909 
910  // ...and wait for any other tasks not revoked to finish their work
911  auto tasks_to_wait_for = tasks_started - ps.tasks_revoked;
912  while (ps.tasks_finished < tasks_to_wait_for) {
914  }
915 
916  // Clear status to allow the ThreadPoolParallelSection to be
917  // re-used.
918  ps.tasks_finished = 0;
919  }
920 
922  PerThread* pt = GetPerThread();
924  }
925 
926  //----------------------------------------------------------------------
927  //
928  // Preferred workers
929  // -----------------
930  //
931  // Initialize the set of hints for preferred worker threads we will
932  // use. We do this once, covering the maximum num_threads_ items,
933  // in order to avoid resizing preferred_workers concurrent with
934  // access from worker threads.
935  //
936  // For simplicity we initialize with hints round-robin among the
937  // workers. For simple workloads with 1 main thread this means we
938  // will distribute work across the pool of workers. For workers
939  // with multiple main threads it attempts to balance the load.
940  //
941  // These hints are just used as a starting point, and are updated by
942  // the worker thread that actually claims an item (e.g., if an item
943  // initially assigned to thread T1 is stolen and executed by T2,
944  // then T2 is assigned at the new preferred worker).
945  //
946  // Note that the hints are held in the _main_ thread that submits
947  // work to the pool. We assume that a thread is primarily
948  // submitting work to just one pool, but allow for the pool to
949  // change over time. Hence we allow the hints vector to grow over
950  // time.
951  //
952  // A note on terminology used in the variable names here:
953  //
954  // dop - degree of parallelism, as seen by the user. For instance
955  // dop=4 means 4 threads in total: 1 main thread that enters the
956  // loop, plus 1 dispatcher thread, plus 2 additional worker
957  // threads.
958  //
959  // par_idx - a thread's index within the loop, in the range [0,dop).
960  //
961  // num_threads_ - the number of worker threads in the thread pool. A
962  // loop with dop=4 will be common on a pool with 3 threads
963  // (given that the main thread will also participate).
964  //
965  // q_idx - a worker queue index, in the range [0,num_threads_).
966  //
967  // preferred_workers - this maps from par_idx values to q_idx. Hence,
968  // with dop=4 the vector will have length 4, and will identify
969  // which of the workers (0,1,2) should run tasks for the loop.
970  // Note that mapping from par_idx values means that only slots
971  // [1,dop) are actually used in preferred_workers.
972  //
973  // Here are three examples, all assuming a machine with 4 h/w threads,
974  // and ORT configured to use dop=4.
975  //
976  // * First, suppose that a single job is running a series of loops.
977  // Its main thread enters a parallel loop. Initially, let's assume
978  // its preferred worker array is [_,0,1,2], writing "_" for the
979  // unusued element for the par_idx=0 work that the main thread will
980  // run.
981  //
982  // The main thread schedules the dispatcher task onto worker 0.
983  //
984  // The dispatcher task schedules worker tasks onto workers 1 and 2.
985  //
986  // The tasks all execute, without any work stealing, on the threads
987  // they were scheduled on. The preferred worker array remains
988  // [_,0,1,2].
989  //
990  // * Next, assume we have the same job, and for whatever reason the
991  // preferred workers were initially [_,0,0,0].
992  //
993  // The main thread schedules the dispatcher onto worker 0.
994  //
995  // This dispatcher task runs on worker 0, and pushes the worker
996  // tasks back onto worker 0's queue.
997  //
998  // Workers 1 and 2 are idle, and steal tasks from worker 0. As the
999  // tasks run, they update the preferred_workers array to record the
1000  // workers that execute them.
1001  //
1002  // After the loop, the preferred worker array may now be [_,0,2,1]
1003  // or [_,0,1,2], reflecting the fact that the work has got
1004  // re-distributed. The next loop will start out by distributing the
1005  // work to those same workers.
1006  //
1007  // * Finally, let's assume we have two jobs running on two main
1008  // threads, and we are now using DoP=2 in the loops, and have 2
1009  // workers in the thread pool (so the machine is not
1010  // over-subscribed).
1011  //
1012  // Each main thread has its own preferred_workers, and
1013  // let's say initially these are both [_,0].
1014  //
1015  // Here, with DoP=2, each main thread will just dispatch a single
1016  // task immediately (there is no need for asynchrony with only one
1017  // task to generate).
1018  //
1019  // Initially both main threads will submit these tasks to worker 0.
1020  //
1021  // Once worker 1 steals one of these tasks, the task will update its
1022  // preferred worker to be 1.
1023  //
1024  // From that point onwards, the two main threads will dispatch tasks
1025  // to separate workers, avoiding the need for further work stealing.
1026 
1028  static std::atomic<unsigned> next_worker{0};
1029 
1030  // preferred_workers[0] isn't supposed to be used, so initializing it with -1 to:
1031  // a) fault if inappropriately accessed
1032  // b) avoid wasting next_worker value
1033  if (preferred_workers.empty()) {
1034  preferred_workers.push_back(-1);
1035  }
1036 
1037  // preferred_workers maps from a par_idx to a q_idx, hence we
1038  // initialize slots in the range [0,num_threads_]
1039  while (preferred_workers.size() <= num_threads_) {
1040  preferred_workers.push_back(next_worker++ % num_threads_);
1041  }
1042  }
1043 
1044  // Update the preferred worker for par_idx to be the calling thread
1045 
1047  unsigned par_idx) {
1048  unsigned ran_on_idx = GetPerThread()->thread_id;
1049  assert(ran_on_idx < num_threads_);
1050  assert(par_idx < preferred_workers.size());
1051  preferred_workers[par_idx] = ran_on_idx;
1052  }
1053 
1054  // Schedule [par_idx_start,par_idx_end) across the preferred workers
1055 
1056  void ScheduleOnPreferredWorkers(PerThread& pt,
1058  InlinedVector<int>& preferred_workers,
1059  unsigned par_idx_start,
1060  unsigned par_idx_end,
1061  std::function<void(unsigned)> worker_fn) {
1062  for (auto par_idx = par_idx_start; par_idx < par_idx_end; ++par_idx) {
1063  // Look up hint for par_idx. Note that the hints may have been
1064  // recorded from a prior thread pool with a different number of
1065  // threads, hence we must cap at num_threads_.
1066  assert(par_idx < preferred_workers.size());
1067  unsigned q_idx = preferred_workers[par_idx] % num_threads_;
1068  assert(q_idx < num_threads_);
1069  WorkerData& td = worker_data_[q_idx];
1070  Queue& q = td.queue;
1071  unsigned w_idx;
1072 
1073  // Attempt to enqueue the task
1074  auto push_status = q.PushBackWithTag([worker_fn, par_idx, &preferred_workers, &ps, this]() {
1075  // Record the worker thread that actually runs this task.
1076  // This will form the preferred worker for the next loop.
1077  UpdatePreferredWorker(preferred_workers, par_idx);
1078  worker_fn(par_idx);
1079  ps.tasks_finished++;
1080  },
1081  pt.tag, w_idx);
1082 
1083  // Queue accepted the task; wake the thread that owns the queue.
1084  // In addition, if the queue was non-empty, attempt to wake
1085  // another thread (which may then steal the task).
1086  if (push_status == PushResult::ACCEPTED_IDLE || push_status == PushResult::ACCEPTED_BUSY) {
1087  ps.tasks.push_back({q_idx, w_idx});
1088  td.EnsureAwake();
1089  if (push_status == PushResult::ACCEPTED_BUSY) {
1090  worker_data_[Rand(&pt.rand) % num_threads_].EnsureAwake();
1091  }
1092  }
1093  }
1094  }
1095 
1096  //......................................................................
1097  //
1098  // Parallel loops
1099  // --------------
1100  //
1101  // Ensure that the ThreadPoolParallelSection has sufficient workers to
1102  // execute a loop with degree of parallelism n. We track the number
1103  // of workers already avaiable to the parallel section, prior to
1104  // submitting tasks to the work queues to make up the total.
1105  //
1106  // Each worker will call in to worker_fn(idx) with a per-worker thread
1107  // ID. Note there are different levels of indirection here:
1108  //
1109  // - In a single-loop parallel section, worker_fn will directly
1110  // execute the threadpool.cc code that implements the parallel loop.
1111  //
1112  // - In a multi-loop parallel section, worker_fn is an intermediate
1113  // function that is long-lived (i.e., that lasts until the end of
1114  // the parallel section, as opposed to just a single loop's
1115  // duration).
1116  //
1117  // For ordinary parallel sections, RunInParallelInternal dispatch
1118  // tasks to a number of workers asynchronously. A worker thread will
1119  // be selected as the dispatcher that distributes tasks. This removes
1120  // the O(n) work off the critical path of starting the first loop
1121  // iteration, helping maintain good performance on very short loops.
1122  //
1123  // See the note on terminology above for the use of variable names
1124  // here.
1125 
1126  void RunInParallelInternal(PerThread& pt,
1128  unsigned new_dop,
1129  bool dispatch_async,
1130  std::function<void(unsigned)> worker_fn) {
1131  // Ensure that the vector of preferred workers is sufficient for the
1132  // size of the loop we are entering. We do this before dispatching
1133  // tasks for the loop in order to avoid any races between changes to
1134  // the size of the vector and recording the locations that tasks run
1135  // in as they complete.
1136  assert(new_dop <= (unsigned)(num_threads_ + 1));
1137  auto& preferred_workers = pt.preferred_workers;
1138  InitializePreferredWorkers(preferred_workers);
1139 
1140  // current_dop is the degree of parallelism via any workers already
1141  // participating in the current parallel section. Usually, for
1142  // single-loop parallel sections, current_dop=1.
1143  unsigned current_dop = ps.current_dop;
1144 
1145  if (current_dop < new_dop) {
1146  unsigned extra_needed = new_dop - current_dop;
1147 
1148  // Attempt to summon additional workers asynchronously if we
1149  // need more than one. Otherwise, we fall back to simple
1150  // synchronous scheduling.
1151  if (dispatch_async && extra_needed > 1) {
1152  assert(current_dop == 1);
1153 
1154  // Task for dispatching work asynchronously.
1155  Task dispatch_task = [current_dop, new_dop, worker_fn, &preferred_workers, &ps, &pt, this]() {
1156  // Record that dispatch work has started. This must occur
1157  // prior to scheduling tasks, in order to synchronize with
1158  // EndParallelSectionInternal. [ If EndParallelSection
1159  // revoked a task, and then sees distpatch_started=false, then
1160  // it knows that it revoked the dispatcher. Conversely, if it
1161  // revokes a task, and then sees dispatch_started=true, then
1162  // it knows it revoked a worker task. ]
1163  ps.dispatch_started.store(true, std::memory_order_seq_cst);
1164 
1165  // Schedule tasks par_idx=[current_dop+1,new_dop)
1166  ScheduleOnPreferredWorkers(pt, ps, preferred_workers, current_dop + 1, new_dop, worker_fn);
1167  ps.dispatch_done.store(true, std::memory_order_release);
1168 
1169  // Record the worker thread that actually runs this task.
1170  // This will form the preferred worker for the next loop.
1171  UpdatePreferredWorker(preferred_workers, current_dop);
1172 
1173  // Run dispatcher task's own work, par_idx=current_dop
1174  worker_fn(current_dop);
1175 
1176  // Dispatcher's work complete
1177  ps.work_done.store(true, std::memory_order_release);
1178  };
1179 
1180  profiler_.LogStart();
1181  ps.dispatch_q_idx = preferred_workers[current_dop] % num_threads_;
1182  WorkerData& dispatch_td = worker_data_[ps.dispatch_q_idx];
1183  Queue& dispatch_que = dispatch_td.queue;
1184 
1185  // assign dispatch task to selected dispatcher
1186  auto push_status = dispatch_que.PushBackWithTag(dispatch_task, pt.tag, ps.dispatch_w_idx);
1187  // Queue accepted the task; wake the thread that owns the queue.
1188  // In addition, if the queue was non-empty, attempt to wake
1189  // another thread (which may then steal the task).
1190  if (push_status == PushResult::ACCEPTED_IDLE || push_status == PushResult::ACCEPTED_BUSY) {
1191  dispatch_td.EnsureAwake();
1192  if (push_status == PushResult::ACCEPTED_BUSY) {
1193  worker_data_[Rand(&pt.rand) % num_threads_].EnsureAwake();
1194  }
1195  } else {
1196  ps.dispatch_q_idx = -1; // failed to enqueue dispatch_task
1197  }
1199  } else {
1200  // Synchronous dispatch
1201  ScheduleOnPreferredWorkers(pt, ps, preferred_workers, current_dop, new_dop, std::move(worker_fn));
1202  }
1203  ps.current_dop = new_dop;
1204  }
1205  }
1206 
1207  // Run a single parallel loop in an existing parallel section. This
1208  // maps directly onto SummonWorkers to create sufficient worker
1209  // threads for the desired degree of parallelism, followed by
1210  // dispatching the loop to those workers.
1212  std::function<void(unsigned idx)> fn,
1213  unsigned n,
1214  std::ptrdiff_t block_size) override {
1215  ORT_ENFORCE(n <= num_threads_ + 1, "More work items than threads");
1216  profiler_.LogStartAndCoreAndBlock(block_size);
1217  PerThread* pt = GetPerThread();
1218  assert(pt->leading_par_section && "RunInParallel, but not in parallel section");
1219  assert((n > 1) && "Trivial parallel section; should be avoided by caller");
1220 
1221  // Publish the work to any existing workers in the parallel
1222  // section, and ensure it is visible to any new threads created
1223  // below.
1224  assert((!ps.current_loop) && "RunInParallelSection, but loop already active");
1225  ThreadPoolLoop loop{std::move(fn), n};
1226  ps.current_loop = &loop;
1227 
1228  // Increase the worker count if needed. Each worker will pick up
1229  // loops to execute from the current parallel section.
1230  std::function<void(unsigned)> worker_fn = [&ps](unsigned par_idx) {
1231  while (ps.active) {
1232  if (ps.current_loop.load() == nullptr) {
1234  } else {
1235  ps.workers_in_loop++;
1236  ThreadPoolLoop* work_item = ps.current_loop;
1237  if (work_item && par_idx < work_item->threads_needed) {
1238  work_item->fn(par_idx);
1239  }
1240  ps.workers_in_loop--;
1241  }
1242  }
1243  };
1244  RunInParallelInternal(*pt, ps, n, false, std::move(worker_fn));
1245  assert(ps.dispatch_q_idx == -1);
1247 
1248  // Run work in the main thread
1249  loop.fn(0);
1251 
1252  // Wait for workers to exit the loop
1253  ps.current_loop = 0;
1254  while (ps.workers_in_loop) {
1256  }
1257  profiler_.LogEnd(ThreadPoolProfiler::WAIT);
1258  }
1259 
1260  // Run a single parallel loop _without_ a parallel section. This is a
1261  // special case of RunInParallelSection, avoiding code paths for
1262  // handing off multiple loops to the pool of workers.
1263  // For main thread:
1264  // 1. select a dispatcher and do job distribution;
1265  // 2. run fn(0);
1266  // 3, wait for all;
1267  // For dispatcher:
1268  // 1. distribute jobs to all other threads;
1269  // 2. run fn(...) itself.
1270  // For all other threads:
1271  // 1. run fn(...);
1272  void RunInParallel(std::function<void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size) override {
1273  ORT_ENFORCE(n <= num_threads_ + 1, "More work items than threads");
1274  profiler_.LogStartAndCoreAndBlock(block_size);
1275  PerThread* pt = GetPerThread();
1278  RunInParallelInternal(*pt, ps, n, true, fn); // select dispatcher and do job distribution;
1280  fn(0); // run fn(0)
1282  EndParallelSectionInternal(*pt, ps); // wait for all
1283  profiler_.LogEnd(ThreadPoolProfiler::WAIT);
1284  }
1285 
1286  int NumThreads() const final {
1287  return num_threads_;
1288  }
1289 
1290  int CurrentThreadId() const final {
1291  const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread();
1292  if (pt->pool == this) {
1293  return pt->thread_id;
1294  }
1295  return -1;
1296  }
1297 
1299  spin_loop_status_ = SpinLoopStatus::kBusy;
1300  }
1301 
1303  spin_loop_status_ = SpinLoopStatus::kIdle;
1304  }
1305 
1306  private:
1307  void ComputeCoprimes(int N, Eigen::MaxSizeVector<unsigned>* coprimes) {
1308  for (int i = 1; i <= N; i++) {
1309  unsigned a = i;
1310  unsigned b = N;
1311  // If GCD(a, b) == 1, then a and b are coprimes.
1312  while (b != 0) {
1313  unsigned tmp = a;
1314  a = b;
1315  b = tmp % b;
1316  }
1317  if (a == 1) {
1318  coprimes->push_back(i);
1319  }
1320  }
1321  }
1322 
1323  typedef typename Environment::EnvThread Thread;
1324  struct WorkerData;
1325 
1326  // PerThread objects are allocated in thread-local storage and
1327  // allocated on the thread's first call to GetPerThread. PerThread
1328  // objects are allocated for all threads that submit work to the
1329  // thread pool, in addition to threads within the pool.
1330  //
1331  // In contrast, the WorkerData objects are allocated only for the
1332  // threads in the pool, and their lifetime is managed along with the
1333  // pool.
1334 
1335 #ifdef _MSC_VER
1336 #pragma warning(push)
1337 // C4324: structure was padded due to alignment specifier
1338 #pragma warning(disable : 4324)
1339 #endif // _MSC_VER
1340 
1341  struct ORT_ALIGN_TO_AVOID_FALSE_SHARING PerThread {
1342  constexpr PerThread() : pool(nullptr) {
1343  }
1344  ThreadPoolTempl* pool; // Parent pool, or null for normal threads.
1345  bool initialized{false}; // Non-trivial initialization ran (e.g. for RNG)
1346  uint64_t rand{0}; // Random generator state.
1347  int thread_id{-1}; // Worker thread index in pool.
1348  Tag tag{}; // Work item tag used to identify this thread.
1349  bool leading_par_section{false}; // Leading a parallel section (used only for asserts)
1350 
1351  // When this thread is entering a parallel section, it will
1352  // initially push work to this set of workers. The aim is to
1353  // retain cache state within the workers, and to reduce the number
1354  // of times that the work-stealing code paths are used for
1355  // rebalancing.
1356  InlinedVector<int> preferred_workers;
1357  };
1358 
1359 #ifdef _MSC_VER
1360 #pragma warning(pop)
1361 #endif // _MSC_VER
1362 
1363 
1364  struct WorkerData {
1365  constexpr WorkerData() : thread(), queue() {
1366  }
1367  std::unique_ptr<Thread> thread;
1368  Queue queue;
1369 
1370  // Each thread has a status, available read-only without locking, and protected
1371  // by the mutex field below for updates. The status is used for three
1372  // purposes:
1373  //
1374  // 1. To identify threads that are good candidates to push work to.
1375  // We prefer to push work to threads that are actively spinning (no need
1376  // for an OS wake-up, and no need for current work to finish). After that, we
1377  // prefer to push work to threads that are blocked (no need to wait for the
1378  // current work to finish).
1379  //
1380  // 2. To identify threads that are good candidates to steal work from. We
1381  // prefer to steal work from threads that are active outside the worker loop.
1382  // This avoids "snatching" new work away from a thread that has just been
1383  // given it but not yet noticed.
1384  //
1385  // 3. When pushing work to a thread, we use the status read-only to identify
1386  // when we need to wake the thread. This read-only check avoids the
1387  // need for mutex / condvar operations in the case where the thread pool
1388  // remains busy.
1389 
1390  enum class ThreadStatus : uint8_t {
1391  Spinning, // Spinning in the work loop, and other cases (initialization) where
1392  // the thread will soon be in the loop
1393  Active, // Running user code, not waiting for work
1394  Blocking, // In the process of blocking; may no longer notice work pushed to it
1395  Blocked, // Blocked on cv
1396  Waking, // Not yet back in the worker loop, but wake-up notification sent
1397  };
1398 
1399  ThreadStatus GetStatus() const {
1400  return status;
1401  }
1402 
1403  // State transitions, called from other threads
1404 
1405  // We employ mutex for synchronizing on Blocked/Waking state (EnsureAwake/SeBlocked)
1406  // to wakeup the thread in the event it goes to sleep. Because thread status
1407  // is an atomic member the lock is not necessary to update it.
1408  // Thus, we do not obtain the mutex when we set Active/Spinning state for the thread.
1409  // While manipulating under the mutex, we employ relaxed semantics so the compiler is not restricted
1410  // any further.
1411  void EnsureAwake() {
1412  ThreadStatus seen = GetStatus();
1413  if (seen == ThreadStatus::Blocking ||
1414  seen == ThreadStatus::Blocked) {
1415  std::unique_lock<OrtMutex> lk(mutex);
1416  // Blocking state exists only transiently during the SetBlock() method
1417  // while holding the lock. We may observe it at the start of this
1418  // function, but after acquiring the lock then the target thread
1419  // will either be blocked or not.
1420  seen = status.load(std::memory_order_relaxed);
1421  assert(seen != ThreadStatus::Blocking);
1422  if (seen == ThreadStatus::Blocked) {
1423  status.store(ThreadStatus::Waking, std::memory_order_relaxed);
1424  lk.unlock();
1425  cv.notify_one();
1426  }
1427  }
1428  }
1429 
1430  // State transitions, called only from the thread itself
1431  // The lock is only used in the synchronization between EnsureAwake and SetBlocked,
1432  // while the Active vs Spinning states are just used as a hint for work stealing
1433  // (prefer to steal from a thread that is actively running a task, rather than stealing from
1434  // a thread that is spinning and likely to pick up the task itself).
1435  void SetActive() {
1436  status = ThreadStatus::Active;
1437  }
1438 
1439  void SetSpinning() {
1440  status = ThreadStatus::Spinning;
1441  }
1442 
1443  void SetBlocked(std::function<bool()> should_block,
1444  std::function<void()> post_block) {
1445  std::unique_lock<OrtMutex> lk(mutex);
1446  assert(GetStatus() == ThreadStatus::Spinning);
1447  status.store(ThreadStatus::Blocking, std::memory_order_relaxed);
1448  if (should_block()) {
1449  status.store(ThreadStatus::Blocked, std::memory_order_relaxed);
1450  do {
1451  cv.wait(lk);
1452  } while (status.load(std::memory_order_relaxed) == ThreadStatus::Blocked);
1453  post_block();
1454  }
1455  status.store(ThreadStatus::Spinning, std::memory_order_relaxed);
1456  }
1457 
1458  private:
1459  std::atomic<ThreadStatus> status{ThreadStatus::Spinning};
1460  OrtMutex mutex;
1461  OrtCondVar cv;
1462  };
1463 
1464  Environment& env_;
1465  const unsigned num_threads_;
1466  const bool allow_spinning_;
1467  const bool set_denormal_as_zero_;
1468  Eigen::MaxSizeVector<WorkerData> worker_data_;
1469  Eigen::MaxSizeVector<Eigen::MaxSizeVector<unsigned>> all_coprimes_;
1470  std::atomic<unsigned> blocked_; // Count of blocked workers, used as a termination condition
1471  std::atomic<bool> done_;
1472 
1473  // SpinLoopStatus indicates whether the main worker spinning (inner) loop should exit immediately when there is
1474  // no work available (kIdle) or whether it should follow the configured spin-then-block policy (kBusy).
1475  // This lets the ORT session layer hint to the thread pool that it should stop spinning in between
1476  // requests.
1477  enum class SpinLoopStatus {
1478  kIdle,
1479  kBusy
1480  };
1481 
1482  // Default is no control over spinning
1483  std::atomic<SpinLoopStatus> spin_loop_status_{SpinLoopStatus::kBusy};
1484 
1485  // Wake any blocked workers so that they can cleanly exit WorkerLoop(). For
1486  // a clean exit, each thread will observe (1) done_ set, indicating that the
1487  // destructor has been called, (2) all threads blocked, and (3) no
1488  // items in the work queues.
1489 
1490  void WakeAllWorkersForExit() {
1491  for (auto& td : worker_data_) {
1492  td.EnsureAwake();
1493  }
1494  }
1495 
1496  // Main worker thread loop.
1497  void WorkerLoop(int thread_id) {
1498  PerThread* pt = GetPerThread();
1499  WorkerData& td = worker_data_[thread_id];
1500  Queue& q = td.queue;
1501  bool should_exit = false;
1502  pt->pool = this;
1503  pt->thread_id = thread_id;
1504 
1505  assert(td.GetStatus() == WorkerData::ThreadStatus::Spinning);
1506 
1507  constexpr int log2_spin = 20;
1508  const int spin_count = allow_spinning_ ? (1ull << log2_spin) : 0;
1509  const int steal_count = spin_count / 100;
1510 
1511  SetDenormalAsZero(set_denormal_as_zero_);
1512  profiler_.LogThreadId(thread_id);
1513 
1514  while (!should_exit) {
1515  Task t = q.PopFront();
1516  if (!t) {
1517  // Spin waiting for work.
1518  for (int i = 0; i < spin_count && !done_; i++) {
1519  if (((i + 1) % steal_count == 0)) {
1520  t = Steal(StealAttemptKind::TRY_ONE);
1521  } else {
1522  t = q.PopFront();
1523  }
1524  if (t) break;
1525 
1526  if (spin_loop_status_.load(std::memory_order_relaxed) == SpinLoopStatus::kIdle) {
1527  break;
1528  }
1530  }
1531 
1532  // Attempt to block
1533  if (!t) {
1534  td.SetBlocked( // Pre-block test
1535  [&]() -> bool {
1536  bool should_block = true;
1537  // Check whether work was pushed to us while attempting to block. We make
1538  // this test while holding the per-thread status lock, and after setting
1539  // our status to ThreadStatus::Blocking.
1540  //
1541  // This synchronizes with ThreadPool::Schedule which pushes work to the queue
1542  // and then tests for ThreadStatus::Blocking/Blocked (via EnsureAwake):
1543  //
1544  // Main thread: Worker:
1545  // #1 Push work #A Set status blocking
1546  // #2 Read worker status #B Check queue
1547  // #3 Wake if blocking/blocked
1548  //
1549  // If #A is before #2 then main sees worker blocked and wakes
1550  //
1551  // If #A if after #2 then #B will see #1, and we abandon blocking
1552  assert(!t);
1553  t = q.PopFront();
1554  if (t) {
1555  should_block = false;
1556  }
1557 
1558  // No work pushed to us, continue attempting to block. The remaining
1559  // test is to synchronize with termination requests. If we are
1560  // shutting down and all worker threads blocked without work, that's
1561  // we are done.
1562  if (should_block) {
1563  blocked_++;
1564  if (done_ && blocked_ == num_threads_) {
1565  should_block = false;
1566  // Almost done, but need to re-check queues.
1567  // Consider that all queues are empty and all worker threads are preempted
1568  // right after incrementing blocked_ above. Now a free-standing thread
1569  // submits work and calls destructor (which sets done_). If we don't
1570  // re-check queues, we will exit leaving the work unexecuted.
1571  if (NonEmptyQueueIndex() != -1) {
1572  // Note: we must not pop from queues before we decrement blocked_,
1573  // otherwise the following scenario is possible. Consider that instead
1574  // of checking for emptiness we popped the only element from queues.
1575  // Now other worker threads can start exiting, which is bad if the
1576  // work item submits other work. So we just check emptiness here,
1577  // which ensures that all worker threads exit at the same time.
1578  blocked_--;
1579  } else {
1580  should_exit = true;
1581  }
1582  }
1583  }
1584  return should_block;
1585  },
1586  // Post-block update (executed only if we blocked)
1587  [&]() {
1588  blocked_--;
1589  });
1590  // Thread just unblocked. Unless we picked up work while
1591  // blocking, or are exiting, then either work was pushed to
1592  // us, or it was pushed to an overloaded queue
1593  if (!t) t = q.PopFront();
1594  if (!t) t = Steal(StealAttemptKind::TRY_ALL);
1595  }
1596  }
1597 
1598  if (t) {
1599  td.SetActive();
1600  t();
1601  profiler_.LogRun(thread_id);
1602  td.SetSpinning();
1603  }
1604  }
1605 
1606  // Whichever thread(s) observe the termination conditions are responsible for waking
1607  // any other threads that have remained blocked.
1608  if (should_exit) {
1609  WakeAllWorkersForExit();
1610  }
1611  }
1612 
1613  // Steal tries to steal work from other worker threads in a
1614  // best-effort manner. We steal only from threads that are running
1615  // in user code (ThreadStatus::Active). The intuition behind this
1616  // is that the thread is busy with other work, and we will avoid
1617  // "snatching" work from a thread which is just about to notice the
1618  // work itself.
1619 
1620  Task Steal(StealAttemptKind steal_kind) {
1621  PerThread* pt = GetPerThread();
1622  unsigned size = num_threads_;
1623  unsigned num_attempts = (steal_kind == StealAttemptKind::TRY_ALL) ? size : 1;
1624  unsigned r = Rand(&pt->rand);
1625  unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
1626  unsigned victim = r % size;
1627 
1628  for (unsigned i = 0; i < num_attempts; i++) {
1629  assert(victim < size);
1630  if (worker_data_[victim].GetStatus() == WorkerData::ThreadStatus::Active) {
1631  Task t = worker_data_[victim].queue.PopBack();
1632  if (t) {
1633  return t;
1634  }
1635  }
1636  victim += inc;
1637  if (victim >= size) {
1638  victim -= size;
1639  }
1640  }
1641 
1642  return Task();
1643  }
1644 
1645  int NonEmptyQueueIndex() {
1646  PerThread* pt = GetPerThread();
1647  const unsigned size = static_cast<unsigned>(worker_data_.size());
1648  unsigned r = Rand(&pt->rand);
1649  unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
1650  unsigned victim = r % size;
1651  for (unsigned i = 0; i < size; i++) {
1652  if (!worker_data_[victim].queue.Empty()) {
1653  return victim;
1654  }
1655  victim += inc;
1656  if (victim >= size) {
1657  victim -= size;
1658  }
1659  }
1660  return -1;
1661  }
1662 
1663  static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
1664  return std::hash<std::thread::id>()(std::this_thread::get_id());
1665  }
1666 
1667  static EIGEN_STRONG_INLINE PerThread* GetPerThread() {
1668  static thread_local PerThread per_thread_;
1669  PerThread* pt = &per_thread_;
1670  if (!pt->initialized) {
1671  pt->rand = GlobalThreadIdHash();
1672  pt->initialized = true;
1673  }
1674  return pt;
1675  }
1676 
1677  static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
1678  uint64_t current = *state;
1679  // Update the internal state
1680  *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
1681  // Generate the random output (using the PCG-XSH-RS scheme)
1682  return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
1683  }
1684 };
1685 
1686 } // namespace concurrency
1687 
1688 } // namespace onnxruntime
void ScheduleOnPreferredWorkers(PerThread &pt, ThreadPoolParallelSection &ps, InlinedVector< int > &preferred_workers, unsigned par_idx_start, unsigned par_idx_end, std::function< void(unsigned)> worker_fn)
void RunInParallel(std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size) override
bool RevokeWithTag(Tag tag, unsigned w_idx)
virtual void StartParallelSection(ThreadPoolParallelSection &ps)=0
const GLdouble * v
Definition: glcorearb.h:837
GLsizei const GLchar *const * string
Definition: glcorearb.h:814
ORT_DISALLOW_COPY_ASSIGNMENT_AND_MOVE(ThreadPoolProfiler)
#define ORT_ENFORCE(condition,...)
Definition: common.h:173
void RunInParallelSection(ThreadPoolParallelSection &ps, std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size) override
GLboolean GLboolean GLboolean GLboolean a
Definition: glcorearb.h:1222
GLdouble s
Definition: glad.h:3009
#define ORT_TRY
Definition: common.h:154
GLdouble GLdouble GLdouble q
Definition: glad.h:2445
void EndParallelSection(ThreadPoolParallelSection &ps) override
virtual void EndParallelSection(ThreadPoolParallelSection &ps)=0
InlinedVector< std::pair< int, unsigned > > tasks
GLdouble n
Definition: glcorearb.h:2008
GLfloat f
Definition: glcorearb.h:1926
std::chrono::high_resolution_clock::time_point TimePoint
Definition: common.h:43
#define ORT_ALIGN_TO_AVOID_FALSE_SHARING
bool SetDenormalAsZero(bool on)
void EndParallelSectionInternal(PerThread &pt, ThreadPoolParallelSection &ps)
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the queue
Definition: thread.h:623
void InitializePreferredWorkers(InlinedVector< int > &preferred_workers)
absl::InlinedVector< T, N, Allocator > InlinedVector
GLuint id
Definition: glcorearb.h:655
void StartParallelSection(ThreadPoolParallelSection &ps) override
GLuint const GLchar * name
Definition: glcorearb.h:786
GLboolean GLboolean GLboolean b
Definition: glcorearb.h:1222
GLdouble t
Definition: glad.h:2397
PushResult PushBackWithTag(Work w, Tag tag, unsigned &w_idx)
const std::function< void(unsigned)> fn
void StartParallelSectionInternal(PerThread &pt, ThreadPoolParallelSection &ps)
void Schedule(std::function< void()> fn) override
void RunInParallelInternal(PerThread &pt, ThreadPoolParallelSection &ps, unsigned new_dop, bool dispatch_async, std::function< void(unsigned)> worker_fn)
GLsizeiptr size
Definition: glcorearb.h:664
**Note that the tasks the is the thread number *for the or if it s being executed by a non pool thread(this *can happen in cases where the whole pool is occupied and the calling *thread contributes to running the work load).**Thread pool.Have fun
GLenum GLfloat param
Definition: glcorearb.h:104
ThreadPoolProfiler(int num_threads, const CHAR_TYPE *threal_pool_name)
void UpdatePreferredWorker(InlinedVector< int > &preferred_workers, unsigned par_idx)
ThreadPoolTempl(const CHAR_TYPE *name, int num_threads, bool allow_spinning, Environment &env, const ThreadOptions &thread_options)
void LogStartAndCoreAndBlock(std::ptrdiff_t block_size)
GA_API const UT_StringHolder N
virtual void RunInParallelSection(ThreadPoolParallelSection &ps, std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size)=0
GLubyte GLubyte GLubyte GLubyte w
Definition: glcorearb.h:857
GLboolean r
Definition: glcorearb.h:1222
#define const
Definition: zconf.h:214
ThreadPoolLoop(std::function< void(unsigned)> f, unsigned t)
void LogCoreAndBlock(std::ptrdiff_t block_size)
**Note that the tasks the thread_id
Definition: thread.h:637
#define ORT_FALSE_SHARING_BYTES
#define ORT_CATCH(x)
Definition: common.h:155
#define ORT_HANDLE_EXCEPTION(func)
Definition: common.h:158
virtual void RunInParallel(std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size)=0
**Note that the tasks the is the thread number *for the pool
Definition: thread.h:637