12 #include <type_traits>
15 #include "onnxruntime_config.h"
20 #pragma GCC diagnostic push
21 #pragma GCC diagnostic ignored "-Wunused-parameter"
22 #pragma GCC diagnostic ignored "-Wunused-result"
27 #ifdef HAS_CLASS_MEMACCESS
28 #pragma GCC diagnostic ignored "-Wclass-memaccess"
30 #elif defined(_MSC_VER)
32 #pragma warning(disable : 4127)
33 #pragma warning(disable : 4805)
36 #include "unsupported/Eigen/CXX11/ThreadPool"
39 #pragma GCC diagnostic pop
40 #elif defined(_MSC_VER)
148 namespace onnxruntime {
149 namespace concurrency {
184 #if defined(__x86_64__)
185 #define ORT_FALSE_SHARING_BYTES 128
187 #define ORT_FALSE_SHARING_BYTES 64
190 #define ORT_ALIGN_TO_AVOID_FALSE_SHARING alignas(ORT_FALSE_SHARING_BYTES)
203 #ifdef ORT_MINIMAL_BUILD
242 using Clock = std::chrono::high_resolution_clock;
251 void LogRun(
int thread_idx);
256 struct MainThreadStat {
259 std::vector<std::ptrdiff_t> blocks_;
260 std::vector<onnxruntime::TimePoint> points_;
262 void LogBlockSize(std::ptrdiff_t block_size);
268 bool enabled_ =
false;
269 MainThreadStat& GetMainThreadStat();
272 #pragma warning(push)
274 #pragma warning(disable : 4324)
278 uint64_t num_run_ = 0;
285 std::vector<ChildThreadStat> child_thread_stats_;
313 std::function<
void(
unsigned idx)> fn,
314 unsigned n, std::ptrdiff_t block_size) = 0;
330 virtual void RunInParallel(std::function<
void(
unsigned idx)> fn,
331 unsigned n, std::ptrdiff_t block_size) = 0;
393 const std::function<void(unsigned)>
fn;
400 template <
typename Work,
typename Tag,
unsigned kSize>
405 assert((kSize & (kSize - 1)) == 0);
407 assert(kSize <= (64 << 10));
408 for (
unsigned i = 0; i < kSize; i++) array_[i].state.store(
ElemState::kEmpty, std::memory_order_relaxed);
425 front = front_.load(std::memory_order_relaxed);
426 e = &array_[(front - 1) & kMask];
427 s = e->state.load(std::memory_order_relaxed);
429 e->state.compare_exchange_strong(s,
ElemState::kBusy, std::memory_order_acquire)) {
431 front = ((front - 1) & kMask2) | (front & ~kMask2);
432 front_.store(front, std::memory_order_relaxed);
439 !e->state.compare_exchange_strong(s,
ElemState::kBusy, std::memory_order_acquire))
441 Work
w = std::move(e->w);
444 front = ((front - 1) & kMask2) | (front & ~kMask2);
445 front_.store(front, std::memory_order_relaxed);
452 std::lock_guard<OrtMutex> lock(mutex_);
453 unsigned back = back_.load(std::memory_order_relaxed);
454 Elem& e = array_[(back - 1) & kMask];
455 ElemState
s = e.state.load(std::memory_order_relaxed);
457 !e.state.compare_exchange_strong(s,
ElemState::kBusy, std::memory_order_acquire))
459 back = ((back - 1) & kMask2) | (back & ~kMask2);
460 back_.store(back, std::memory_order_relaxed);
472 std::lock_guard<OrtMutex> lock(mutex_);
473 unsigned back = back_.load(std::memory_order_relaxed);
474 w_idx = (back - 1) & kMask;
475 Elem& e = array_[w_idx];
476 ElemState
s = e.state.load(std::memory_order_relaxed);
478 !e.state.compare_exchange_strong(s,
ElemState::kBusy, std::memory_order_acquire))
480 bool was_ready = (((back ^ (front_.load(std::memory_order_relaxed))) & kMask) == 0);
481 back = ((back - 1) & kMask2) | (back & ~kMask2);
482 back_.store(back, std::memory_order_relaxed);
493 std::lock_guard<OrtMutex> lock(mutex_);
501 back = back_.load(std::memory_order_relaxed);
502 e = &array_[back & kMask];
503 s = e->state.load(std::memory_order_relaxed);
505 e->state.compare_exchange_strong(s,
ElemState::kBusy, std::memory_order_acquire)) {
507 back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
512 !e->state.compare_exchange_strong(s,
ElemState::kBusy, std::memory_order_acquire))
514 Work
w = std::move(e->w);
517 back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
534 bool revoked =
false;
535 std::lock_guard<OrtMutex> lock(mutex_);
536 Elem& e = array_[w_idx];
537 ElemState
s = e.state.load(std::memory_order_relaxed);
544 e.state.compare_exchange_strong(s,
ElemState::kBusy, std::memory_order_acquire)) {
546 unsigned back = back_.load(std::memory_order_relaxed);
547 unsigned back_idx = back & kMask;
548 if (back_idx != w_idx) {
560 back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
574 return SizeOrNotEmpty<true>();
580 return SizeOrNotEmpty<false>() == 0;
584 static const unsigned kMask = kSize - 1;
585 static const unsigned kMask2 = (kSize << 1) - 1;
587 enum class ElemState : uint8_t {
602 std::atomic<ElemState> state;
623 template <
bool NeedSizeEstimate>
624 unsigned SizeOrNotEmpty()
const {
627 unsigned front = front_.load(std::memory_order_acquire);
630 unsigned back = back_.load(std::memory_order_acquire);
631 unsigned front1 = front_.load(std::memory_order_relaxed);
632 if (front != front1) {
634 std::atomic_thread_fence(std::memory_order_acquire);
637 if (NeedSizeEstimate) {
638 return CalculateSize(front, back);
641 unsigned maybe_zero = ((front ^ back) & kMask2);
644 eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
650 unsigned CalculateSize(
unsigned front,
unsigned back)
const {
651 int size = (front & kMask2) - (back & kMask2);
659 if (size > static_cast<int>(kSize))
661 return static_cast<unsigned>(
size);
665 void operator=(
const RunQueue&) =
delete;
668 static std::atomic<uint32_t> next_tag{1};
670 template <
typename Environment>
675 static unsigned WorkerLoop(
int id, Eigen::ThreadPoolInterface*
param) {
678 this_ptr->WorkerLoop(
id);
684 void SignalAllAndWait() {
690 WakeAllWorkersForExit();
693 for (
size_t i = 0; i < worker_data_.size(); ++i) worker_data_[i].
thread.reset();
702 return profiler_.
Stop();
733 return v_ == other.
v_;
739 typedef std::function<void()>
Task;
743 const ThreadOptions& thread_options)
744 : profiler_(num_threads, name),
746 num_threads_(num_threads),
747 allow_spinning_(allow_spinning),
748 set_denormal_as_zero_(thread_options.set_denormal_as_zero),
749 worker_data_(num_threads),
750 all_coprimes_(num_threads),
760 for (
auto i = 1u; i <= num_threads_; ++i) {
761 all_coprimes_.emplace_back(i);
762 ComputeCoprimes(i, &all_coprimes_.back());
769 worker_data_.resize(num_threads_);
770 for (
auto i = 0u; i < num_threads_; i++) {
771 worker_data_[i].thread.reset(env_.CreateThread(name, i, WorkerLoop,
this, thread_options));
791 PerThread* pt = GetPerThread();
792 int q_idx = Rand(&pt->rand) % num_threads_;
793 WorkerData& td = worker_data_[q_idx];
820 assert((!pt.leading_par_section) &&
"Nested parallelism not supported");
821 assert((!ps.
active) &&
"Starting parallel section, but active already");
822 pt.leading_par_section =
true;
836 PerThread* pt = GetPerThread();
845 assert((pt.leading_par_section) &&
"Ending parallel section, but none started");
846 assert((ps.
active) &&
"Ending parallel section, but not active");
847 pt.leading_par_section =
false;
892 unsigned tasks_started =
static_cast<unsigned>(ps.
tasks.size());
893 while (!ps.
tasks.empty()) {
894 const auto& item = ps.
tasks.back();
895 Queue&
q = worker_data_[item.first].queue;
905 while (!ps.
work_done.load(std::memory_order_acquire)) {
922 PerThread* pt = GetPerThread();
1028 static std::atomic<unsigned> next_worker{0};
1033 if (preferred_workers.empty()) {
1034 preferred_workers.push_back(-1);
1039 while (preferred_workers.size() <= num_threads_) {
1040 preferred_workers.push_back(next_worker++ % num_threads_);
1048 unsigned ran_on_idx = GetPerThread()->thread_id;
1049 assert(ran_on_idx < num_threads_);
1050 assert(par_idx < preferred_workers.size());
1051 preferred_workers[par_idx] = ran_on_idx;
1059 unsigned par_idx_start,
1060 unsigned par_idx_end,
1061 std::function<
void(
unsigned)> worker_fn) {
1062 for (
auto par_idx = par_idx_start; par_idx < par_idx_end; ++par_idx) {
1066 assert(par_idx < preferred_workers.size());
1067 unsigned q_idx = preferred_workers[par_idx] % num_threads_;
1068 assert(q_idx < num_threads_);
1069 WorkerData& td = worker_data_[q_idx];
1074 auto push_status = q.
PushBackWithTag([worker_fn, par_idx, &preferred_workers, &ps,
this]() {
1087 ps.
tasks.push_back({q_idx, w_idx});
1090 worker_data_[Rand(&pt.rand) % num_threads_].EnsureAwake();
1129 bool dispatch_async,
1130 std::function<
void(
unsigned)> worker_fn) {
1136 assert(new_dop <= (
unsigned)(num_threads_ + 1));
1137 auto& preferred_workers = pt.preferred_workers;
1145 if (current_dop < new_dop) {
1146 unsigned extra_needed = new_dop - current_dop;
1151 if (dispatch_async && extra_needed > 1) {
1152 assert(current_dop == 1);
1155 Task dispatch_task = [current_dop, new_dop, worker_fn, &preferred_workers, &ps, &pt,
this]() {
1163 ps.dispatch_started.store(
true, std::memory_order_seq_cst);
1167 ps.dispatch_done.store(
true, std::memory_order_release);
1174 worker_fn(current_dop);
1177 ps.work_done.store(
true, std::memory_order_release);
1181 ps.dispatch_q_idx = preferred_workers[current_dop] % num_threads_;
1182 WorkerData& dispatch_td = worker_data_[ps.dispatch_q_idx];
1183 Queue& dispatch_que = dispatch_td.queue;
1186 auto push_status = dispatch_que.
PushBackWithTag(dispatch_task, pt.tag, ps.dispatch_w_idx);
1191 dispatch_td.EnsureAwake();
1193 worker_data_[Rand(&pt.rand) % num_threads_].EnsureAwake();
1196 ps.dispatch_q_idx = -1;
1212 std::function<
void(
unsigned idx)> fn,
1214 std::ptrdiff_t block_size)
override {
1215 ORT_ENFORCE(n <= num_threads_ + 1,
"More work items than threads");
1217 PerThread* pt = GetPerThread();
1218 assert(pt->leading_par_section &&
"RunInParallel, but not in parallel section");
1219 assert((n > 1) &&
"Trivial parallel section; should be avoided by caller");
1224 assert((!ps.
current_loop) &&
"RunInParallelSection, but loop already active");
1230 std::function<void(unsigned)> worker_fn = [&ps](
unsigned par_idx) {
1237 if (work_item && par_idx < work_item->threads_needed) {
1238 work_item->
fn(par_idx);
1272 void RunInParallel(std::function<
void(
unsigned idx)> fn,
unsigned n, std::ptrdiff_t block_size)
override {
1273 ORT_ENFORCE(n <= num_threads_ + 1,
"More work items than threads");
1275 PerThread* pt = GetPerThread();
1287 return num_threads_;
1291 const PerThread* pt =
const_cast<ThreadPoolTempl*
>(
this)->GetPerThread();
1292 if (pt->pool ==
this) {
1293 return pt->thread_id;
1307 void ComputeCoprimes(
int N, Eigen::MaxSizeVector<unsigned>* coprimes) {
1308 for (
int i = 1; i <=
N; i++) {
1318 coprimes->push_back(i);
1323 typedef typename Environment::EnvThread
Thread;
1336 #pragma warning(push)
1338 #pragma warning(disable : 4324)
1342 constexpr PerThread() :
pool(nullptr) {
1345 bool initialized{
false};
1349 bool leading_par_section{
false};
1356 InlinedVector<int> preferred_workers;
1360 #pragma warning(pop)
1367 std::unique_ptr<Thread>
thread;
1390 enum class ThreadStatus : uint8_t {
1399 ThreadStatus GetStatus()
const {
1411 void EnsureAwake() {
1412 ThreadStatus seen = GetStatus();
1413 if (seen == ThreadStatus::Blocking ||
1414 seen == ThreadStatus::Blocked) {
1415 std::unique_lock<OrtMutex> lk(mutex);
1420 seen = status.load(std::memory_order_relaxed);
1421 assert(seen != ThreadStatus::Blocking);
1422 if (seen == ThreadStatus::Blocked) {
1423 status.store(ThreadStatus::Waking, std::memory_order_relaxed);
1436 status = ThreadStatus::Active;
1439 void SetSpinning() {
1440 status = ThreadStatus::Spinning;
1443 void SetBlocked(std::function<
bool()> should_block,
1444 std::function<
void()> post_block) {
1445 std::unique_lock<OrtMutex> lk(mutex);
1446 assert(GetStatus() == ThreadStatus::Spinning);
1447 status.store(ThreadStatus::Blocking, std::memory_order_relaxed);
1448 if (should_block()) {
1449 status.store(ThreadStatus::Blocked, std::memory_order_relaxed);
1452 }
while (status.load(std::memory_order_relaxed) == ThreadStatus::Blocked);
1455 status.store(ThreadStatus::Spinning, std::memory_order_relaxed);
1459 std::atomic<ThreadStatus> status{ThreadStatus::Spinning};
1465 const unsigned num_threads_;
1466 const bool allow_spinning_;
1467 const bool set_denormal_as_zero_;
1468 Eigen::MaxSizeVector<WorkerData> worker_data_;
1469 Eigen::MaxSizeVector<Eigen::MaxSizeVector<unsigned>> all_coprimes_;
1470 std::atomic<unsigned> blocked_;
1471 std::atomic<bool> done_;
1477 enum class SpinLoopStatus {
1490 void WakeAllWorkersForExit() {
1491 for (
auto& td : worker_data_) {
1498 PerThread* pt = GetPerThread();
1499 WorkerData& td = worker_data_[
thread_id];
1501 bool should_exit =
false;
1505 assert(td.GetStatus() == WorkerData::ThreadStatus::Spinning);
1507 constexpr
int log2_spin = 20;
1508 const int spin_count = allow_spinning_ ? (1ull << log2_spin) : 0;
1509 const int steal_count = spin_count / 100;
1514 while (!should_exit) {
1515 Task t = q.PopFront();
1518 for (
int i = 0; i < spin_count && !done_; i++) {
1519 if (((i + 1) % steal_count == 0)) {
1536 bool should_block =
true;
1555 should_block =
false;
1564 if (done_ && blocked_ == num_threads_) {
1565 should_block =
false;
1571 if (NonEmptyQueueIndex() != -1) {
1584 return should_block;
1593 if (!t) t = q.PopFront();
1601 profiler_.
LogRun(thread_id);
1609 WakeAllWorkersForExit();
1621 PerThread* pt = GetPerThread();
1622 unsigned size = num_threads_;
1624 unsigned r = Rand(&pt->rand);
1625 unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
1626 unsigned victim = r %
size;
1628 for (
unsigned i = 0; i < num_attempts; i++) {
1629 assert(victim < size);
1630 if (worker_data_[victim].GetStatus() == WorkerData::ThreadStatus::Active) {
1631 Task t = worker_data_[victim].queue.PopBack();
1637 if (victim >= size) {
1645 int NonEmptyQueueIndex() {
1646 PerThread* pt = GetPerThread();
1647 const unsigned size =
static_cast<unsigned>(worker_data_.size());
1648 unsigned r = Rand(&pt->rand);
1649 unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
1650 unsigned victim = r %
size;
1651 for (
unsigned i = 0; i <
size; i++) {
1652 if (!worker_data_[victim].
queue.Empty()) {
1656 if (victim >= size) {
1663 static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
1664 return std::hash<std::thread::id>()(std::this_thread::get_id());
1667 static EIGEN_STRONG_INLINE PerThread* GetPerThread() {
1668 static thread_local PerThread per_thread_;
1669 PerThread* pt = &per_thread_;
1670 if (!pt->initialized) {
1671 pt->rand = GlobalThreadIdHash();
1672 pt->initialized =
true;
1677 static EIGEN_STRONG_INLINE
unsigned Rand(uint64_t* state) {
1678 uint64_t current = *state;
1680 *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
1682 return static_cast<unsigned>((current ^ (current >> 22)) >> (22 + (current >> 61)));
void ScheduleOnPreferredWorkers(PerThread &pt, ThreadPoolParallelSection &ps, InlinedVector< int > &preferred_workers, unsigned par_idx_start, unsigned par_idx_end, std::function< void(unsigned)> worker_fn)
virtual void StartProfiling()=0
std::string StopProfiling() override
void RunInParallel(std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size) override
bool RevokeWithTag(Tag tag, unsigned w_idx)
virtual void StartParallelSection(ThreadPoolParallelSection &ps)=0
GLsizei const GLchar *const * string
ORT_DISALLOW_COPY_ASSIGNMENT_AND_MOVE(ThreadPoolProfiler)
std::function< void()> Task
PaddingToAvoidFalseSharing padding_2
std::chrono::high_resolution_clock Clock
#define ORT_ENFORCE(condition,...)
void RunInParallelSection(ThreadPoolParallelSection &ps, std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size) override
GLboolean GLboolean GLboolean GLboolean a
const unsigned threads_needed
PaddingToAvoidFalseSharing padding_1
GLdouble GLdouble GLdouble q
void EndParallelSection(ThreadPoolParallelSection &ps) override
virtual std::string StopProfiling()=0
virtual void EndParallelSection(ThreadPoolParallelSection &ps)=0
InlinedVector< std::pair< int, unsigned > > tasks
std::atomic< bool > dispatch_done
RunQueue< Task, Tag, 1024 > Queue
std::atomic< ThreadPoolLoop * > current_loop
std::string DumpChildThreadStat()
~ThreadPoolTempl() override
std::chrono::high_resolution_clock::time_point TimePoint
int CurrentThreadId() const final
std::atomic< bool > work_done
#define ORT_ALIGN_TO_AVOID_FALSE_SHARING
bool SetDenormalAsZero(bool on)
void EndParallelSectionInternal(PerThread &pt, ThreadPoolParallelSection &ps)
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the queue
void InitializePreferredWorkers(InlinedVector< int > &preferred_workers)
absl::InlinedVector< T, N, Allocator > InlinedVector
void StartParallelSection(ThreadPoolParallelSection &ps) override
GLuint const GLchar * name
GLboolean GLboolean GLboolean b
bool operator==(Tag &other) const
std::atomic< unsigned > workers_in_loop
PushResult PushBackWithTag(Work w, Tag tag, unsigned &w_idx)
const std::function< void(unsigned)> fn
void StartParallelSectionInternal(PerThread &pt, ThreadPoolParallelSection &ps)
void LogEnd(ThreadPoolEvent)
std::atomic< bool > dispatch_started
void Schedule(std::function< void()> fn) override
void RunInParallelInternal(PerThread &pt, ThreadPoolParallelSection &ps, unsigned new_dop, bool dispatch_async, std::function< void(unsigned)> worker_fn)
**Note that the tasks the is the thread number *for the or if it s being executed by a non pool thread(this *can happen in cases where the whole pool is occupied and the calling *thread contributes to running the work load).**Thread pool.Have fun
ThreadPoolProfiler(int num_threads, const CHAR_TYPE *threal_pool_name)
void UpdatePreferredWorker(InlinedVector< int > &preferred_workers, unsigned par_idx)
ThreadPoolTempl(const CHAR_TYPE *name, int num_threads, bool allow_spinning, Environment &env, const ThreadOptions &thread_options)
void LogThreadId(int thread_idx)
void LogStartAndCoreAndBlock(std::ptrdiff_t block_size)
GA_API const UT_StringHolder N
virtual void RunInParallelSection(ThreadPoolParallelSection &ps, std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size)=0
void LogEndAndStart(ThreadPoolEvent)
GLubyte GLubyte GLubyte GLubyte w
char padding[ORT_FALSE_SHARING_BYTES]
ThreadPoolLoop(std::function< void(unsigned)> f, unsigned t)
void LogCoreAndBlock(std::ptrdiff_t block_size)
std::atomic< bool > active
**Note that the tasks the thread_id
#define ORT_FALSE_SHARING_BYTES
std::atomic< unsigned > tasks_finished
#define ORT_HANDLE_EXCEPTION(func)
virtual void RunInParallel(std::function< void(unsigned idx)> fn, unsigned n, std::ptrdiff_t block_size)=0
void LogRun(int thread_idx)
int NumThreads() const final
void StartProfiling() override
**Note that the tasks the is the thread number *for the pool