HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
PDG_EventQueue.h
Go to the documentation of this file.
1 /*
2  * PROPRIETARY INFORMATION. This software is proprietary to
3  * Side Effects Software Inc., and is not to be reproduced,
4  * transmitted, or disclosed in any way without written permission.
5  *
6  * COMMENTS:
7  */
8 
9 #ifndef __PDG_EventQueue_h__
10 #define __PDG_EventQueue_h__
11 
12 #include "PDG_API.h"
13 #include "PDG_Event.h"
14 #include "PDG_EventTypes.h"
15 
16 #include <UT/UT_Array.h>
17 #include <UT/UT_ConcurrentQueue.h>
19 #include <UT/UT_UniquePtr.h>
20 
21 #include <SYS/SYS_AtomicInt.h>
22 
23 #define PDG_EVENT_QUEUE_STATS 0
24 
25 class PDG_EventEmitter;
26 class PDG_EventHandler;
27 
28 class UT_Thread;
29 
30 /* Event queue that dispatches PDG_Events to the handlers registered for
31  * the PDG_EventEmitter that produced them.
32  */
34 {
35 public:
36  /// Scoped object that pauses event handling. Pausing is implemented with
37  /// an atomic counter on the queue, so nested ScopedPause objects work as
38  /// expected. Each scoped pause increments the pause request counter, so
39  /// event processing will only proceed once all ScopedPause objects are
40  /// unwound.
42  {
43  public:
45  : myQueue(queue)
46  {
47  queue->pause();
48  }
49 
51  {
52  myQueue->resume();
53  }
54 
55  private:
56  PDG_EventQueue* myQueue;
57  };
58 
59 public:
60  /// Constructs a new event queue
61  PDG_EventQueue(bool background);
62  ~PDG_EventQueue();
63 
64  /// Terminates the queue, waits for the event thread to finish, and
65  /// unsets the queue from all event emitters registered with this event
66  /// queue instance.
67  void terminate();
68 
69  /// Queues an event that will eventually be dispatched by the background
70  /// thread running in this event queue instance.
71  void queueEvent(
72  const PDG_EventEmitter* emitter,
73  const PDG_Event& event);
74 
75  /// Emits an event immediately on the calling thread
76  void emitEvent(
77  const PDG_EventEmitter* emitter,
78  const PDG_Event& event) const;
79 
80  /// Waits for all queued events to finish, and prevents new events from
81  /// being added until that condition is reached. Blocks the caller. If
82  /// an event emitter is passed into this function, it gets deregistered
83  /// before unblocking the event queue.
84  void waitAllEvents(PDG_EventEmitter* emitter=nullptr);
85 
86  /// Waits until the specified emitter is registered, to ensure that all
87  /// events for that emitter have been flushed
88  void waitEmitter(const PDG_EventEmitter* emitter);
89 
90  /// Returns whether or not the event queue attempts to consolidate
91  /// consecutive events of the same type
92  inline bool isConsolidateEvents() const
93  { return myIsConsolidateEvents; }
94 
95  /// Configures the event consolidation flag
96  inline void setConsolidateEvents(bool consolidate)
97  { myIsConsolidateEvents = consolidate; }
98 
99 private:
100  friend class PDG_EventEmitter;
101 
102  struct EmitterInfo
103  {
104  PDG_EventHandlerArray myHandlers;
105  PDG_EventFilterArray myFilters;
106 
107  void add(
108  PDG_EventHandler* handler,
109  const PDG_EventFilter& filter,
110  PDG_EventFilterMap& filter_map);
111  bool remove(
112  PDG_EventHandler* handler,
113  PDG_EventFilterMap& filter_map);
114  void removeAll(
115  PDG_EventHandlerArray& removed,
116  PDG_EventFilterMap& filter_map,
117  bool user_only);
118  void emitEvent(const PDG_Event& event) const;
119  };
120 
121  using EventInfo = std::pair<const PDG_EventEmitter*, PDG_Event>;
122  using EventArray = UT_Array<EventInfo>;
123  using EventQueue = UT_ConcurrentQueue<EventInfo>;
124  using EmitterMap = UT_ConcurrentHashMap<PDG_EventEmitter*, EmitterInfo>;
125  using EmitterArrayMap = UT_UniquePtr<EmitterMap[]>;
126 
127  class EventFunctor
128  {
129  public:
130  EventFunctor(
132  const EventArray& events)
133  : myQueue(queue)
134  , myEvents(events) {}
135  ~EventFunctor() {}
136 
137  void operator()(
138  const UT_BlockedRange<int64> &range) const
139  {
140  for (int64 i = range.begin();
141  i != range.end();
142  ++i)
143  {
144  myQueue->processEvent(myEvents[i]);
145  }
146  }
147  private:
148  PDG_EventQueue* myQueue;
149  const EventArray& myEvents;
150  };
151 
152 private:
153 
154  exint emitterIndex(const PDG_EventEmitter* emitter) const;
155 
156  void registerEmitter(PDG_EventEmitter* emitter);
157  void deregisterEmitter(PDG_EventEmitter* emitter);
158 
159  bool addEventHandler(
160  PDG_EventEmitter* emitter,
161  PDG_EventHandler* handler,
162  const PDG_EventFilter& filter,
163  PDG_EventFilterMap& filter_map);
164  void removeEventHandler(
165  PDG_EventEmitter* emitter,
166  PDG_EventHandler* handler,
167  PDG_EventFilterMap& filter_map);
168  void removeAllEventHandlers(
169  PDG_EventEmitter* emitter,
170  PDG_EventFilterMap& filter_map,
171  bool user_only);
172  void findEventHandlers(
173  const PDG_EventEmitter* emitter,
174  PDG_EventHandlerArray& handlers) const;
175 
176  void processQueueStats(int print_limit);
177  void processEvent(const EventInfo& info);
178  void processEvents();
179 
180  inline void pause()
181  { myPauseRequests.add(1); }
182  inline void resume()
183  { myPauseRequests.add(-1); }
184 
185  static void* runThread(void* param);
186 
187 private:
188  EventQueue myEvents;
189  EmitterArrayMap myEmitters;
190 
191  UT_Thread* myThread;
192 
193 #if PDG_EVENT_QUEUE_STATS
194  exint myDropEventCount;
195  exint myQueuedEventCount;
196  exint myPoppedEventCount;
197  exint myProcessedEventCount;
198  exint myQueueLogTime;
199 #endif
200 
201  exint myEmitterMapSize;
202  SYS_AtomicInt32 myPauseRequests;
203 
204  bool myIsTerminating;
205  bool myIsWaiting;
206  bool myIsProcessing;
207  bool myIsConsolidateEvents;
208 };
209 
210 #endif
GLenum GLint * range
Definition: glcorearb.h:1925
#define PDG_API
Definition: PDG_API.h:23
int64 exint
Definition: SYS_Types.h:125
bool isConsolidateEvents() const
std::unique_ptr< T, Deleter > UT_UniquePtr
A smart pointer for unique ownership of dynamically allocated objects.
Definition: UT_UniquePtr.h:39
struct _cl_event * event
Definition: glcorearb.h:2961
*get result *(waiting if necessary)*A common idiom is to fire a bunch of sub tasks at the queue
Definition: thread.h:623
long long int64
Definition: SYS_Types.h:116
GLenum GLfloat param
Definition: glcorearb.h:104
void setConsolidateEvents(bool consolidate)
Configures the event consolidation flag.
void pause(int delay) noexcept
Definition: thread.h:102
ScopedPause(PDG_EventQueue *queue)
ImageBuf OIIO_API add(Image_or_Const A, Image_or_Const B, ROI roi={}, int nthreads=0)
Declare prior to use.
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition: glcorearb.h:1297