HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
IlmThreadProcessGroup.h
Go to the documentation of this file.
1 //
2 // SPDX-License-Identifier: BSD-3-Clause
3 // Copyright (c) Contributors to the OpenEXR Project.
4 //
5 
6 #ifndef INCLUDED_ILM_THREAD_PROCESS_GROUP_H
7 #define INCLUDED_ILM_THREAD_PROCESS_GROUP_H
8 
9 //-----------------------------------------------------------------------------
10 //
11 // Class ProcessGroup is a templated inline helper for constraining
12 // task contexts to a number of threads. It maintains a list of
13 // contexts and then can hand them out one at a time, waiting for a
14 // previous thread request to finish before handing out more,
15 // preventing over-subscription / allocation of contexts.
16 //
17 //-----------------------------------------------------------------------------
18 
19 #include "IlmThreadConfig.h"
20 #include "IlmThreadExport.h"
21 #include "IlmThreadNamespace.h"
22 #include "IlmThreadSemaphore.h"
23 
24 #include "Iex.h"
25 
26 #include <atomic>
27 #include <string>
28 #include <type_traits>
29 #include <vector>
30 
32 
33 template <typename P,
36  std::is_same <decltype (P {}.next), P *>::value, bool> = true>
38 {
39 public:
40  using Process = P;
41 
42  ProcessGroup (unsigned int numThreads)
43  : _sem (numThreads)
44  , _avail_head (nullptr)
45  , _first_failure (nullptr)
46  {
47  _fixed_pool.resize (numThreads);
48  for ( unsigned int i = 0; i < numThreads; ++i )
49  {
50  if (i == (numThreads - 1))
51  _fixed_pool[i].next = nullptr;
52  else
53  _fixed_pool[i].next = &(_fixed_pool[i+1]);
54  }
55  _avail_head = &(_fixed_pool[0]);
56  }
57 
58  ProcessGroup (const ProcessGroup&) = delete;
59  ProcessGroup& operator= (const ProcessGroup&) = delete;
60  ProcessGroup (ProcessGroup&&) = default;
63  {
64  std::string *cur = _first_failure.load ();
65  delete cur;
66  }
67 
68  void push (Process *p)
69  {
70  Process* oldhead = _avail_head.load (std::memory_order_relaxed);
71 
72  do
73  {
74  p->next = oldhead;
75  } while (!_avail_head.compare_exchange_weak (
76  oldhead, p,
77  std::memory_order_release,
78  std::memory_order_relaxed));
79 
80  // notify someone else there's one available
81  _sem.post ();
82  }
83 
84  // called by the thread dispatching work units, may block
86  {
87  Process* ret = nullptr;
88 
89  // we do not have to worry about ABA problems as
90  // we have a static pool of items we own, we're just
91  // putting them here and popping them off.
92 
93  // used for honoring the numThreads, as pop
94  // should only be called by the one thread
95  // waiting to submit thread calls
96  _sem.wait ();
97 
98  ret = _avail_head.load (std::memory_order_acquire);
99 
100  Process* newhead;
101  do
102  {
103  if (!ret)
104  std::cerr << "GACK: serious failure case???" << std::endl;
105 
106  newhead = ret->next;
107  } while ( !_avail_head.compare_exchange_weak(
108  ret, newhead, std::memory_order_acquire));
109 
110  return ret;
111  }
112 
113  void record_failure (const char *e)
114  {
115  // should we construct a list of failures if there are
116  // more than one? seems less confusing to just report
117  // the first we happened to record
118 
119  std::string *cur = _first_failure.load ();
120  if (!cur)
121  {
122  std::string *msg = new std::string (e);
123  if (! _first_failure.compare_exchange_strong (cur, msg))
124  delete msg;
125  }
126  }
127 
129  {
130  std::string *cur = _first_failure.load ();
131  _first_failure.store (nullptr);
132 
133  if (cur)
134  {
135  std::string msg (*cur);
136  delete cur;
137 
138  throw IEX_NAMESPACE::IoExc (msg);
139  }
140  }
141 private:
142  Semaphore _sem;
143 
144  std::vector<Process> _fixed_pool;
145 
146  std::atomic<Process *> _avail_head;
147 
148  std::atomic<std::string *> _first_failure;
149 };
150 
151 
153 
154 #endif // INCLUDED_ILM_THREAD_POOL_H
typename std::enable_if< B, T >::type enable_if_t
Define Imath::enable_if_t to be std for C++14, equivalent for C++11.
GLsizei const GLfloat * value
Definition: glcorearb.h:824
void push(Process *p)
#define ILMTHREAD_INTERNAL_NAMESPACE_HEADER_ENTER
#define ILMTHREAD_INTERNAL_NAMESPACE_HEADER_EXIT
void record_failure(const char *e)
BaseExc IoExc
Definition: IexBaseExc.h:162
ProcessGroup(unsigned int numThreads)
LeafData & operator=(const LeafData &)=delete