HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
UT_TaskState.h
Go to the documentation of this file.
1 /*
2  * PROPRIETARY INFORMATION. This software is proprietary to
3  * Side Effects Software Inc., and is not to be reproduced,
4  * transmitted, or disclosed in any way without written permission.
5  *
6  * NAME: UT_TaskState.h (UT Library, C++)
7  *
8  * COMMENTS:
9  */
10 
11 #pragma once
12 
13 #ifndef __UT_TASKSTATE_H_INCLUDED__
14 #define __UT_TASKSTATE_H_INCLUDED__
15 
16 #include "UT_API.h"
17 
18 #include "UT_Task.h"
19 #include "UT_TaskArena.h"
20 #include "UT_Thread.h"
21 #include <SYS/SYS_AtomicInt.h>
22 #include <SYS/SYS_AtomicPtr.h>
23 #include <utility>
24 
25 /// A task representing a wait for completion of a task by another thread.
26 /// We maintain a thread-safe linked list of these objects in UT_TaskState.
28 {
29 public:
31  : myNext(nullptr)
32  {
33  }
34 
35 private:
36  UT_TaskStateProxy *myNext;
37 
38  friend class UT_TaskState;
39 };
40 
41 /// A task node for managing which thread is currently working on a given task
43 {
44  /// Special value used to indicate that myWaitingTasks is not accepting
45  /// additions.
46  ///
47  /// When the task state is "unplugged", it means that it's still accepting
48  /// compute or wait tasks. When it's "plugged" it no longer accepts tasks.
49  static UT_TaskStateProxy *plugged()
50  { return (UT_TaskStateProxy *)uintptr_t(-1); }
51 
52 public:
54  {
55  FREE, /// Thread has acquired responsibility to evaluate node.
56  BUSY_WITH_ARENA,/// Another thread is busy evaluating the node with an arena.
57  BUSY_NO_ARENA, /// Another thread is busy evaluating the node without an arena.
58  DONE_WITH_ARENA,/// The node has been evaluated with an arena.
59  DONE_NO_ARENA /// The node has been evaluated without an arena.
60  };
61 
63  : myArenaAndGroupRefCount(0)
64  {
65  SYS_STATIC_ASSERT_MSG(sizeof(UT_TaskState) == 2*sizeof(SYS_AtomicInt32) + sizeof(void*),
66  "This is just to check that the union is putting the two pointers "
67  "in the same place.");
68 
69  reset();
70  }
72  {
73  UT_ASSERT_P(myArenaAndGroupRefCount.relaxedLoad() == 0);
74  UT_ASSERT_P(myArenaAndGroup.relaxedLoad() == nullptr || myWaitingTasks.relaxedLoad() == plugged());
75  }
76 
77  /// Set the state to be "free" with no waiting tasks.
78  /// This cannot be called from within threaded code.
79  void
81  {
82  myStatus.relaxedStore(FREE);
83  UT_ASSERT_P(myArenaAndGroupRefCount.relaxedLoad() == 0);
84  // This will also assign nullptr to myWaitingTasks.
85  // It doesn't need to be atomic, because nothing else is allowed to
86  // access it while we're resetting.
87  myArenaAndGroup.relaxedStore(nullptr);
88  }
89 
90  /// Test whether the task state is marked as DONE.
91  bool isDone() const
92  {
93  auto state = myStatus.relaxedLoad();
94  return state == DONE_NO_ARENA || state == DONE_WITH_ARENA;
95  }
96 
97  /// Attempt to claim this node for the calling thread, returning the
98  /// current status.
99  TaskStatus
100  tryMarkAsBusy(bool run_in_task_arena=false)
101  {
102  auto state = myStatus.relaxedLoad();
103  if (state == DONE_WITH_ARENA || state == DONE_NO_ARENA || state == BUSY_NO_ARENA)
104  return (TaskStatus)state;
105 
106  // Increment the reference count before checking myStatus again,
107  // to avoid a race condition where a thread might delete it
108  // early. We decrement after if it turned out that we don't need
109  // the reference after all. Just in case run_in_task_arena is set
110  // inconsistently between threads, we need the same increment
111  // regardless of whether run_in_task_arena is true, and regardless
112  // of whether state was already BUSY_WITH_ARENA.
113  myArenaAndGroupRefCount.add(1);
114 
115  // If we're currently FREE, only allow one thread to change the status
116  // to BUSY. Otherwise, return the BUSY/DONE state.
117  state = myStatus.compare_swap(FREE, run_in_task_arena ? BUSY_WITH_ARENA : BUSY_NO_ARENA);
118 
119  if (state == DONE_NO_ARENA || state == BUSY_NO_ARENA || (!run_in_task_arena && state == FREE))
120  myArenaAndGroupRefCount.add(-1);
121  else if (state == DONE_WITH_ARENA)
122  decrefArenaGroup();
123 
124  return (TaskStatus)state;
125  }
126  /// Assuming that we're done, claim the node for the calling thread.
127  /// Returns FREE if succeeded, BUSY if failed.
128  TaskStatus
130  {
131  UT_ASSERT_MSG(0, "This is untested code. Note that calling code in the build is now unused."
132  " If myArenaAndGroup was used before, it must be used on subsequent executes.");
133  if (myStatus.compare_swap(DONE_NO_ARENA, BUSY_NO_ARENA) == DONE_NO_ARENA)
134  {
135  // If myWaitingTasks is plugged(), we need to clear it to null,
136  // but if we're using a task arena, myArenaAndGroupRefCount may
137  // still be non-zero, depending on how this functions is supposed
138  // to be used. If it is non-zero, this code is wrong.
139  UT_ASSERT(myArenaAndGroupRefCount.relaxedLoad() == 0);
140  myArenaAndGroup.relaxedStore(nullptr);
141  return FREE;
142  }
143  return BUSY_NO_ARENA;
144  }
145  /// Assuming that we're busy, add a waiting task to be spawned when we're
146  /// free again. This is done a lock-free linked-list style.
147  void
148  addWaitingTask(UT_Task &parent_task)
149  {
150  UT_ASSERT_P(myStatus.relaxedLoad() == BUSY_NO_ARENA || myStatus.relaxedLoad() == DONE_NO_ARENA);
151 
152  UT_TaskStateProxy *proxy = new (parent_task.allocate_child()) UT_TaskStateProxy();
153 
154  UT_TaskStateProxy *old;
155  do
156  {
157  old = myWaitingTasks;
158  if (old == plugged())
159  {
160  // List was plugged by markAsDone() after we checked it in
161  // tryMarkAsBusy() before we got here.
162  //
163  // In tryMarkAsBusyFromDone(), it's possible myWaitingTasks is
164  // still plugged() when a separate thread adds a waiting task.
165  // Therefore, the task which adds waiting tasks should always
166  // recycle themselves after adding waiting tasks.
167  parent_task.spawnChild(*proxy);
168  return;
169  }
170  proxy->myNext = old;
171 
172  SYSstoreFence(); // Ensure the task state memory is stored
173  }
174  while (myWaitingTasks.compare_swap(old, proxy) != old);
175  }
176  /// Mark this node as being free. We walk through our waiting tasks and
177  /// spawn them. Since the proxy tasks are just empty tasks, they will
178  /// complete immediately and decrement the ref count of their parent task.
179  /// If the ref count of the parent task goes down to 0, then it will then
180  /// be runnable in the task scheduler.
181  void
182  markAsDone(UT_Task *parent_task, bool run_in_task_arena)
183  {
184  if (parent_task)
185  {
186  // Plug myWaitingTasks so that we move into the DONE state
187  UT_TaskStateProxy *proxy = myWaitingTasks.exchange(plugged());
188 
189  // Spawn off any tasks that we're waiting while we were busy
190  while (proxy != nullptr)
191  {
192  UT_TaskStateProxy *next = proxy->myNext;
193  parent_task->spawnChild(*proxy);
194  proxy = next;
195  }
196  // Set myStatus to DONE so others can reclaim us.
197  myStatus.exchange(DONE_NO_ARENA);
198  }
199  else
200  {
201  // Set myStatus to DONE so others can reclaim us.
202  myStatus.exchange(run_in_task_arena ? DONE_WITH_ARENA : DONE_NO_ARENA);
203  }
204  }
205 
206  /// Non-threaded version of marking as done.
207  void
209  {
210  // Plug myWaitingTasks so that we move into the DONE state.
211  // We should have no proxy tasks in this case!
212  UT_VERIFY(myWaitingTasks.exchange(plugged()) == nullptr);
213 
214  // Clear myStatus so that others can reclaim us.
215  myStatus.exchange(DONE_NO_ARENA);
216  }
217 
218  /// This does a fast (non-atomic) check of the status.
219  TaskStatus
221  {
222  return (TaskStatus)myStatus.relaxedLoad();
223  }
224 
225  using ArenaAndGroup = std::pair<UT_TaskArena,UT_TaskGroup>;
226 
228  {
229  UT_ASSERT_P(myArenaAndGroupRefCount.relaxedLoad() > 0 &&
230  (myStatus.relaxedLoad() == BUSY_WITH_ARENA || myStatus.relaxedLoad() == DONE_WITH_ARENA));
231  myArenaAndGroup.relaxedStore(p);
232  }
234  {
235  UT_ASSERT_P(myArenaAndGroupRefCount.relaxedLoad() > 0 &&
236  (myStatus.relaxedLoad() == BUSY_WITH_ARENA || myStatus.relaxedLoad() == DONE_WITH_ARENA));
237  return myArenaAndGroup.relaxedLoad();
238  }
239  /// Decrements the reference count for myArenaAndGroup
240  /// and deletes it if the reference count reaches zero.
242  {
243  UT_ASSERT_P(myStatus.relaxedLoad() == DONE_WITH_ARENA);
244  if (myArenaAndGroupRefCount.add(-1) == 0)
245  {
246  // NOTE: This must be an atomic exchange, since
247  // there is a race condition where two threads
248  // can both get a ref count of zero, (one of which
249  // incremented and immediately decremented in
250  // tryMarkAsBusy()).
251  auto *arena_group = myArenaAndGroup.exchange(nullptr);
252  delete arena_group;
253  }
254  }
255 private:
256  SYS_AtomicInt32 myStatus;
257  SYS_AtomicInt32 myArenaAndGroupRefCount;
258  union {
261  };
262 };
263 
264 #endif // __UT_TASKSTATE_H_INCLUDED__
Thread has acquired responsibility to evaluate node.
Definition: UT_TaskState.h:56
void markAsDoneNoThread()
Non-threaded version of marking as done.
Definition: UT_TaskState.h:208
#define SYS_STATIC_ASSERT_MSG(expr, msg)
A task node for managing which thread is currently working on a given task.
Definition: UT_TaskState.h:42
void decrefArenaGroup()
Definition: UT_TaskState.h:241
#define UT_API
Definition: UT_API.h:13
ArenaAndGroup * getArenaGroup() const
Definition: UT_TaskState.h:233
void markAsDone(UT_Task *parent_task, bool run_in_task_arena)
Definition: UT_TaskState.h:182
SYS_AtomicPtr< UT_TaskStateProxy > myWaitingTasks
Definition: UT_TaskState.h:260
void reset()
Definition: UT_TaskState.h:80
TaskStatus tryMarkAsBusy(bool run_in_task_arena=false)
Definition: UT_TaskState.h:100
#define UT_ASSERT_P(ZZ)
Definition: UT_Assert.h:125
#define SYSstoreFence()
void spawnChild(UT_Task &task)
Definition: UT_Task.h:68
bool isDone() const
Test whether the task state is marked as DONE.
Definition: UT_TaskState.h:91
#define UT_VERIFY(expr)
Definition: UT_Assert.h:172
#define UT_ASSERT_MSG(ZZ, MM)
Definition: UT_Assert.h:129
void setAndRetainArenaGroup(ArenaAndGroup *p)
Definition: UT_TaskState.h:227
Another thread is busy evaluating the node without an arena.
Definition: UT_TaskState.h:58
Another thread is busy evaluating the node with an arena.
Definition: UT_TaskState.h:57
std::pair< UT_TaskArena, UT_TaskGroup > ArenaAndGroup
Definition: UT_TaskState.h:225
#define UT_ASSERT(ZZ)
Definition: UT_Assert.h:126
void addWaitingTask(UT_Task &parent_task)
Definition: UT_TaskState.h:148
SYS_AtomicPtr< ArenaAndGroup > myArenaAndGroup
Definition: UT_TaskState.h:259
TaskStatus relaxedLoadStatus() const
This does a fast (non-atomic) check of the status.
Definition: UT_TaskState.h:220
TaskStatus tryMarkAsBusyFromDone()
Definition: UT_TaskState.h:129