HDK
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
PDG_Scheduler.h
Go to the documentation of this file.
1 /*
2  * PROPRIETARY INFORMATION. This software is proprietary to
3  * Side Effects Software Inc., and is not to be reproduced,
4  * transmitted, or disclosed in any way without written permission.
5  *
6  * COMMENTS:
7  */
8 
9 #ifndef __PDG_SCHEDULER_H__
10 #define __PDG_SCHEDULER_H__
11 
12 #include "PDG_API.h"
13 
14 #include "PDG_AttributeFile.h"
15 #include "PDG_AttributePrimitive.h"
16 #include "PDG_BatchWorkItem.h"
17 #include "PDG_CookOptions.h"
18 #include "PDG_EventEmitter.h"
19 #include "PDG_EventTypes.h"
20 #include "PDG_File.h"
21 #include "PDG_FileUtils.h"
22 #include "PDG_NodeInterface.h"
23 #include "PDG_SchedulerTypes.h"
24 #include "PDG_WorkItemSort.h"
25 
26 #include <PDGT/PDGT_ValueArgs.h>
27 
28 #include <UT/UT_Array.h>
31 #include <UT/UT_ConcurrentQueue.h>
32 #include <UT/UT_ConcurrentSet.h>
33 #include <UT/UT_Map.h>
34 #include <UT/UT_Options.h>
35 #include <UT/UT_StringArray.h>
36 #include <UT/UT_StringHolder.h>
37 #include <UT/UT_TBBSpinLock.h>
38 
39 #include <time.h>
40 
41 class PDG_CookState;
42 class PDG_GraphContext;
43 class PDG_Node;
44 class PDG_Service;
45 class PDG_SchedulerType;
46 class PDG_WorkItem;
47 
48 class UT_JSONValue;
49 class UT_JSONWriter;
50 
51 namespace PDGN
52 {
53 class PDGN_PollingClientNNG;
54 }
55 
56 /*
57  * Base class for scheduler nodes, which process work items and run them
58  * either in process or out of process
59  */
61  public PDG_EventEmitter,
62  public PDG_TypeInstance,
64 {
65 public:
66  /*
67  * Enumeration of possible return values from onScheduler callback
68  */
70  {
71  /// The work item was cooked, and should be marked as canceled
73 
74  /// The work item was cooked, and should be marked as failed
76 
77  /// The work item was cooked, and should be marked as succeeded
79 
80  /// The call to schedule the work item failed
82 
83  /// The call to schedule the work item succeeded
85 
86  /// The scheduler cannot handle the work item at this time
88 
89  /// The scheduler cannot handle any work items at this time, and
90  /// all further scheduling requests will be deferred
91  eFullDeferred
92  };
93 
94  /*
95  * Enumeration of possible return values from onTick callback
96  */
98  {
99  /// The scheduler is ready to accept work items
101 
102  /// The scheduler is busy, and cannot process additional work items
104 
105  /// The scheduler has encountered some sort of fatal error or request
106  /// to cancel, and will not longer accept work items for the remainder
107  /// of the cook.
108  eSchedulerCancelCook
109  };
110 
111  /*
112  * Enumeration of possible return values from onAcceptWorkItem callback
113  */
115  {
116  /// The scheduler is able to handle the specified work item
118 
119  /// The scheduler is not able to handle the specified work item
121 
122  /// The scheduler cannot tell if can/can't schedule the specified
123  /// work item, and the default logic for checking should be used
124  /// instead
125  eSchedulerDefault
126  };
127 
128  /*
129  * Enumeration of work item data transfer modes
130  */
132  {
133  /// Work item data is transfered to the farm using a shared
134  /// drive, by writing it to .json file on disk.
136 
137  /// Work item data is transfered via RPC
138  eSourceRPCMessage
139  };
140 
141  /*
142  * Enumeration of temp dir cleanup options
143  */
145  {
146  /// The temp file dir is not cleaned up automatically
148 
149  /// The temp file dir is cleaned up when the scheduler is removed or
150  /// the session shuts down
152 
153  /// The temp file dir is cleaned up after the scheduler finishes
154  /// cooking
155  eCleanupCook
156  };
157 
158  /// The key for the name of the scheduler when writing it to JSON
160 
161  /// The key for the type of the scheduler when writing it to JSON
163 
164  /// The key for the parameters of the scheduler when writing them to JSON
166 
167 public:
169  const PDG_BaseType* type,
170  const PDGT_ValueArgs& extra_args,
171  const UT_StringHolder& name);
172  ~PDG_Scheduler() override;
173 
174  /// Returns the list of supported event types
175  const PDG_EventFilter& supportedEventTypes() const override;
176 
177  int64 getMemoryUsage(bool inclusive) const override;
178 
179  /// Debug name method, required by PDGE_DependencyOwner
180  UT_StringHolder debugName() const override
181  { return name(); }
182 
183  /// Resets the dependency owner
184  void resetOwner() override;
185 
186  /// Returns the is cooked dep for this scheduler
188  { return &myIsCookedDep; }
189 
190  /// Returns the is cooked dep for this scheduler
192  { return &myIsSetupDep; }
193 
194  /// Returns the name of the shceudler
195  const UT_StringHolder& name() const
196  { return myName; }
197 
198  /// Called when the type object should reload itself because of a change
199  /// to the underlying Python module
200  bool reloadInstance(UT_WorkBuffer& errors) override;
201 
202  /// Returns the node interface/template that describes the default
203  /// parameters and connections for the scheduler
204  const PDG_NodeInterface* templateInterface() const override;
205 
206  /// Returns true if the scheduler is able to queue the specified work item,
207  /// else false.
208  bool canSchedule(const PDG_WorkItem* work_item);
209 
210  /// Called when the scheduler should process a static dependency graph
211  virtual void onScheduleStatic(
212  const PDG_WorkItemMap& dependencies,
213  const PDG_WorkItemMap& dependents,
214  const PDG_WorkItemArray& ready_items) = 0;
215 
216  /// Called whenever the scheduler should schedule a work item. This is a
217  /// required callback.
218  virtual ScheduleResult onSchedule(PDG_WorkItem* work_item) = 0;
219 
220  /// Called when a specific work item should be canceled
221  virtual void onCancelWorkItems(
222  const PDG_WorkItemArray& work_item,
223  PDG_Node* node) = 0;
224 
225  /// Called when the scheduler is first started, should be used to initialize
226  /// scheduler state, threads, etc. This is an optional callback.
227  virtual bool onStart() = 0;
228 
229  /// Called when the scheduler is stopped, to clean up any resources held
230  /// by the scheduler (sockets, threads, etc). Optional callback.
231  virtual bool onStop() = 0;
232 
233  /// Called when cooking begins
234  virtual bool onStartCook(
235  bool static_cook,
236  const PDG_NodeSet&) = 0;
237 
238  /// Called when cooking completes or is canceled
239  virtual bool onStopCook(bool cancel) = 0;
240 
241  /// Called once at the start of the cook, to allow the scheduler to
242  /// set any custom cook options
243  virtual void onConfigureCook(
244  PDG_CookOptions* cook_ptions) = 0;
245 
246  /// Called at the before cooking on the background thread to setup
247  /// anything needed for a cook.
248  virtual bool onSetupCook() = 0;
249 
250  /// Called periodically during cooking to update state
251  virtual TickResult onTick() = 0;
252 
253  /// Returns an enum indicating whether or not the scheduler accepts the
254  /// specified work item
255  virtual AcceptResult onAcceptWorkItem(
256  const PDG_WorkItem* work_item) = 0;
257 
258  /// Custom file transfer logic, implemented per-scheduler
259  virtual bool onTransferFile(
260  const UT_StringHolder& file_path);
261 
262  /// Transfers a file for a work item that uses this scheduler, using the
263  /// transfer settings on the scheduler itself.
264  bool transferFile(
265  const PDG_WorkItem* work_item,
266  const PDG_File& file,
267  PDG_FileUtils::TransferType default_type,
268  bool check_type,
269  UT_WorkBuffer& errors) const;
270 
271  /// Transfers a file from the local machine to the machine that will be
272  /// executing processor tasks, e.g. a remote machine on the farm.
273  bool transferFile(
274  const UT_StringHolder& file_path,
275  PDG_FileUtils::TransferType default_type,
276  bool check_type,
277  UT_WorkBuffer& errors) const;
278 
279  /// Transfers a file from the local machine to the machine that will be
280  /// executing processor tasks, e.g. a remote machine on the farm. This
281  /// variant of the method preserves the directory structure relative to
282  /// the specified root directory
283  bool transferFile(
284  const UT_StringHolder& root_path,
285  const UT_StringHolder& file_path,
286  const UT_StringHolder& sub_path,
287  PDG_FileUtils::TransferType default_type,
288  bool check_type,
289  UT_WorkBuffer& errors) const;
290 
291  /// Returns the path on the scheduler that a local file will be copied
292  /// to, using the standard file transfer mechanism
293  UT_StringHolder formatTransferPath(
294  const UT_StringHolder& local_path,
295  PDG_FileUtils::TransferType default_type,
296  bool local,
297  bool check_type) const;
298 
299  /// Returns the path on the scheduler that a local file will be copied
300  /// to. Unlike the above method, this method preserves the directory
301  /// structure relative to the specified root directory
302  UT_StringHolder formatTransferPath(
303  const UT_StringHolder& root_path,
304  const UT_StringHolder& local_path,
305  const UT_StringHolder& sub_path,
306  PDG_FileUtils::TransferType default_type,
307  bool local,
308  bool check_type) const;
309 
310  /// Returns the local version of the given path.
311  UT_StringHolder localizePath(
312  const UT_StringHolder& deloc_path) const;
313 
314  /// Returns the delocalized (remote) version of the given local path.
315  UT_StringHolder delocalizePath(
316  const UT_StringHolder& local_path) const;
317 
318  /// Returns the job name for the specified work item
319  UT_StringHolder jobName(const PDG_WorkItem* work_item) const;
320 
321  /// Expands special tokens in the command string
322  virtual UT_StringHolder expandCommandTokens(
323  const UT_StringHolder& command,
324  const PDG_WorkItem* work_item) = 0;
325 
326  /// cook the output node in the graph context of the given file as a single job
327  virtual UT_StringHolder submitAsJob(
328  const UT_StringHolder& graph_file,
329  const UT_StringHolder& node_name) = 0;
330 
331  /// set the working directory local and network path, equivalent
332  /// to the __PDG_DIR__
333  void setWorkingDir(
334  const UT_StringHolder& local_path,
335  const UT_StringHolder& remote_path);
336  /// set the temp directory local and network path, equivalent
337  /// to the __PDG_TEMP__
338  void setTempDir(
339  const UT_StringHolder& local_path,
340  const UT_StringHolder& remote_path);
341  /// set the script directory local and network path, equivalent
342  /// to the __PDG_SCRIPTDIR__
343  void setScriptDir(
344  const UT_StringHolder& local_path,
345  const UT_StringHolder& remote_path);
346 
347  /// sets whether or not the scheduler accepts in-process work items
348  void setAcceptInProcess(bool in_process);
349 
350  /// returns the working directory (network) path, equivalent
351  /// to the __PDG_DIR__ command token when local is false.
352  /// When local is true, returns the absolute path to the shared root
353  /// on the local file system
354  UT_StringHolder workingDir(bool local) const;
355 
356  /// returns the temp directory (network) path, equivalent
357  /// to the __PDG_TEMP__ command token when local is false.
358  /// When local is true, returns the absolute path to the shared temp
359  /// on the local file system
360  UT_StringHolder tempDir(bool local) const;
361 
362  /// returns the temp script directory (network) path, equivalent
363  /// to the __PDG_SCRIPTDIR__ command token when local is false.
364  /// When local is true, returns the absolute path to the temp
365  /// script on the local file system
366  UT_StringHolder scriptDir(bool local) const;
367 
368  /// Returns the path to the directory where log files generated during the
369  /// cook should be written.
370  /// When local is true, returns the aboslute path on the local file system.
371  UT_StringHolder logDir(bool local) const;
372 
373  /// Returns the path to the directory where work items are serialized
374  /// before their commands are executed.
375  /// When local is true, returns the aboslute path on the local file system.
376  UT_StringHolder dataDir(bool local) const;
377 
378  /// Returns the path to an application for the given name, for example
379  /// "python", "hython"
380  virtual UT_StringHolder applicationBin(
381  const UT_StringHolder& name,
382  const PDG_WorkItem* work_item);
383 
384  /// returns the server endpoint for work item results, in format <HOST>:<PORT>,
385  /// equivalent to the __PDG_RESULT_SERVER__ command token
386  virtual UT_StringHolder workItemResultServerAddr() = 0;
387 
388  /// returns a URI for the workitem cook log (text file)
389  virtual UT_StringHolder getLogURI(const PDG_WorkItem* work_item) = 0;
390 
391  /// returns a URI for the workitem status page if relevant
392  virtual UT_StringHolder getStatusURI(const PDG_WorkItem* work_item) = 0;
393 
394  /// Terminate the give shared server
395  virtual bool endSharedServer(
396  const UT_StringHolder& sharedserver_name) = 0;
397  /// Clears metadata associated with the given shared server name
398  virtual void clearSharedServerInfo(
399  const UT_StringHolder &sharedserver_name);
400  /// Sets metadata associated with the given shared server name, returns full name
401  virtual UT_StringHolder setSharedServerInfo(
402  const UT_StringHolder &sharedserver_name,
403  const PDGT_ValueArgs& info);
404  /// Gets metadata associated with the given shared server name
405  PDGT_ValueArgs getSharedServerInfo(
406  const UT_StringHolder &sharedserver_name);
407  /// Gets the list of shared servers
408  UT_StringArray getSharedServers();
409 
410  /// Delete the temporary directory and all its contents
411  virtual void cleanTempDirectory();
412 
413  /// Called to stop the scheduler
414  void stop(bool exiting);
415 
416  /// Called when a cook begins with the cook state data for that cook
417  bool startCook(PDG_CookState& cook_state);
418 
419  /// Called when a cook stops
420  void stopCook();
421 
422  /// Cancels an active cook
423  void cancelCook(bool pause);
424 
425  /// Ticks the scheduler
426  bool tick(PDG_Scheduler::TickResult& tick_result);
427 
428  /// Cooks a specific work item using it's internal cook method
429  bool cookWorkItem(PDG_WorkItem* work_item);
430 
431 
432  /// Submits a static cook with this scheduler
433  void submitStatic();
434 
435 
436  /// Queues a work item and returns a state that represents the result of
437  /// the operation
438  virtual PDG_WorkItemState queueWorkItem(PDG_WorkItem* work_item);
439 
440  /// Removes a work item from any internal tracking lists
441  void removeWorkItem(const PDG_WorkItem* work_item);
442 
443 
444  /// Cancels a cook for a specific work tiem
445  void cancelWorkItem(PDG_WorkItem* work_item);
446 
447  /// Cancels a cook for a specific node
448  void cancelNode(PDG_Node* node);
449 
450  void onWorkItemPriorityChanged(
451  PDG_WorkItem* work_item);
452  void process(bool skip_ready_items);
453 
454  bool isRunning(int count=0);
455  bool isRunning(const PDG_WorkItem* work_item);
456  bool isDefault();
457 
458  bool isWaitForFailures() const;
459  bool isValidateOutputFiles() const;
460  bool isCheckExpectedOutputFiles() const;
461  bool isCompressWorkItemData() const;
462  virtual PDGN::PDGN_PollingClientNNG*
463  getPollingClient() { return nullptr; }
464 
465  WorkItemDataSource workItemDataSource() const;
466 
467  int numRunningItems() const;
468  int numFailedItems() const;
469  int numQueuedItems() const;
470 
471  void setContext(PDG_GraphContext* context);
472  PDG_GraphContext* context() const;
473 
474  void dependencyGraph(
475  PDG_WorkItemMap& dependencies,
476  PDG_WorkItemMap& dependents,
477  PDG_WorkItemArray& ready,
478  bool expand);
479 
480  /// Starts a service using this scheduler
481  virtual bool startService(
482  UT_WorkBuffer& errors,
483  PDG_Service* service)
484  { return false; }
485 
486  /// Stops a service that was started with this scheduler
487  virtual bool stopService(
488  UT_WorkBuffer& errors,
489  PDG_Service* service)
490  { return false; }
491 
492  template <typename T, typename D=T>
494  T& result,
495  PDG_NodeInterface* node,
496  const UT_StringHolder& prefix,
497  const UT_StringHolder& parm,
498  PDG_WorkItem* work_item,
499  const D& default_value,
500  UT_WorkBuffer& errors) const
501  {
502  UT_WorkBuffer parm_name;
503  UT_WorkBuffer toggle_name;
504 
505  if (prefix.isstring() && prefix.length() > 0)
506  {
507  toggle_name.format("{}_override_toggle", templateName());
508  parm_name.format("{}_{}", prefix, parm);
509  }
510  else
511  {
512  parm_name.append(parm);
513  }
514 
515  // If the user supplied us with a batch item, evaluate against the first
516  // sub item instead
517  PDG_WorkItem* eval_work_item = work_item;
518  if (work_item && work_item->isBatch())
519  {
520  auto batch_item =
521  static_cast<PDG_BatchWorkItem*>(work_item);
522  if (batch_item->batchSize() > 0)
523  eval_work_item = batch_item->batchItems()[0];
524  }
525 
526  result = default_value;
527  PDG_Port* port = parameter(parm_name.buffer());
528  if (port)
529  {
530  if (!port->evaluate(0, result, eval_work_item, errors))
531  return false;
532  }
533 
534  if (!node)
535  return true;
536 
537  port = node->parameter(toggle_name.buffer());
538 
539  if (port)
540  {
541  exint toggle_result;
542  port->evaluate(0, toggle_result, eval_work_item, errors);
543 
544  if (toggle_result < 1)
545  return true;
546  }
547 
548  port = node->parameter(parm_name.buffer());
549  if (port)
550  {
551  if (!port->evaluate(0, result, eval_work_item, errors))
552  return false;
553  }
554 
555  return true;
556  }
557 
558 
559  void onWorkItemSetStringArray(
560  PDG_WorkItemID work_item_id,
561  int index,
562  const UT_StringHolder& attribute_name,
564  void onWorkItemSetFloatArray(
565  PDG_WorkItemID work_item_id,
566  int index,
567  const UT_StringHolder& attribute_name,
569  void onWorkItemSetIntArray(
570  PDG_WorkItemID work_item_id,
571  int index,
572  const UT_StringHolder& attribute_name,
574  void onWorkItemSetFileArray(
575  PDG_WorkItemID work_item_id,
576  int index,
577  const UT_StringHolder& attribute_name,
579  void onWorkItemSetDictArray(
580  PDG_WorkItemID work_item_id,
581  int index,
582  const UT_StringHolder& attribute_name,
584 
585  void onWorkItemSetPyObject(
586  PDG_WorkItemID work_item_id,
587  int index,
588  const UT_StringHolder& attribute_name,
589  const UT_StringHolder& pyobject_repr);
590  void onWorkItemSetString(
591  PDG_WorkItemID work_item_id,
592  int index,
593  const UT_StringHolder& attribute_name,
594  const UT_StringHolder& value,
595  int attrib_index);
596  void onWorkItemSetFloat(
597  PDG_WorkItemID work_item_id,
598  int index,
599  const UT_StringHolder& attribute_name,
600  fpreal value,
601  int attrib_index);
602  void onWorkItemSetInt(
603  PDG_WorkItemID work_item_id,
604  int index,
605  const UT_StringHolder& attribute_name,
606  exint value,
607  int attrib_index);
608  void onWorkItemSetFile(
609  PDG_WorkItemID work_item_id,
610  int index,
611  const UT_StringHolder& attribute_name,
613  int attrib_index);
614  void onWorkItemSetDict(
615  PDG_WorkItemID work_item_id,
616  int index,
617  const UT_StringHolder& attribute_name,
618  const UT_OptionsHolder& dict_repr,
619  int attrib_index);
620 
621  void onWorkItemAddOutput(
622  PDG_WorkItemID work_item_id,
623  int index,
624  const UT_StringHolder& path,
625  const UT_StringHolder& tag,
626  PDG_File::Hash hash_code,
627  bool active_only);
628  void onWorkItemAddOutputs(
629  PDG_WorkItemID work_item_id,
630  int index,
631  const UT_StringArray& paths,
632  const UT_StringHolder& tags,
633  const PDG_File::HashArray& hashes,
634  bool active_only);
635  void onWorkItemAddOutputs(
636  PDG_WorkItemID work_item_id,
637  int index,
638  const UT_StringArray& paths,
639  const UT_StringArray& tags,
640  const PDG_File::HashArray& hashes,
641  bool active_only);
642  void onWorkItemInvalidateCache(
643  PDG_WorkItemID work_item_id,
644  int index);
645 
646  void onWorkItemSucceeded(
647  PDG_WorkItemID work_item_id,
648  int index,
649  fpreal cook_duration);
650  void onWorkItemFailed(
651  PDG_WorkItemID work_item_id,
652  int index);
653  void onWorkItemCanceled(
654  PDG_WorkItemID work_item_id,
655  int index);
656  void onWorkItemStartCook(
657  PDG_WorkItemID work_item_id,
658  int index,
659  bool clear_outputs);
660 
661  void onWorkItemSetCustomState(
662  PDG_WorkItemID work_item_id,
663  int index,
664  const UT_StringHolder& custom_state);
665  void onWorkItemSetCookPercent(
666  PDG_WorkItemID work_item_id,
667  int index,
668  fpreal &cook_percent);
669  void onWorkItemAppendLog(
670  PDG_WorkItemID work_item_id,
671  int index,
672  const UT_StringHolder& log_data,
673  PDG_WorkItemLogType log_type);
674 
675  PDG_WorkItemEvalState isWorkItemReady(
676  PDG_WorkItemID work_item_id,
677  int index);
678 
679  /// Emits errors or warnings from the scheduler as PDG_Events
680  void addError(
681  const UT_StringHolder& message) const override;
682  void addWarning(
683  const UT_StringHolder& message) const override;
684 
685  /// Writes the current parameter value configuration of this scheduler
686  /// to JSON
687  bool asJSON(
688  UT_JSONWriter& writer,
689  bool skip_defaults) const;
690 
691  /// Reads and constructs a scheduler from JSON and returns the instance
692  static PDG_Scheduler* fromJSON(
693  const UT_JSONValue* value,
694  PDG_GraphContext* context,
695  UT_WorkBuffer& errors);
696 
697  virtual fpreal tickPeriod() const;
698  virtual int maxItemsPerTick() const;
699  virtual int maxConcurrentTasks() const;
700 
701  PDG_PathMappingMode mapMode() const;
702 
703  PDG_FileTransferType transferType() const;
704  UT_StringHolder transferRoot() const;
705 
706 protected:
707  /// Find the workitem and report error if not found
708  PDG_WorkItem* workItemChecked(
709  PDG_WorkItemID work_item_id,
710  int index);
711 
713  PDGE_Resolutions& resolutions,
714  const PDGE_Evaluator& evaluator,
715  PDGE_Dependency* dependency) override;
716 
717 private:
718  struct Result
719  {
720  PDG_WorkItemState myState;
721  PDG_WorkItemID myId;
722  fpreal myDuration;
723  int myIndex;
724  bool myUnresolve;
725  };
726 
727  using ResultMap = UT_Map<PDG_WorkItemID, Result>;
728  using ResultQueue = UT_ConcurrentQueue<Result>;
729 
730  using PriorityQueue =
733  using WorkItemSet = UT_ConcurrentSet<PDG_WorkItemID>;
734  using NodeSet = UT_ConcurrentSet<PDG_Node*>;
735  using SharedServerInfoMap =
737 
738 private:
739  ScheduleResult schedule(
740  PDG_WorkItem* work_item,
741  UT_Array<Result>& pending);
742  int markCompleted(
743  const Result& result,
744  UT_Array<Result>& pending);
745  PDG_WorkItem* workItem(
746  PDG_WorkItemID work_item_id,
747  int index);
748  void updateDefaultDirectories();
749 
750 private:
751  PDGE_Dependency myIsCookedDep;
752  PDGE_Dependency myIsSetupDep;
753 
754  PriorityQueue myScheduleQueue;
755  ResultQueue myScheduleResults;
756  ResultMap myPauseResults;
757  SharedServerInfoMap mySharedServerInfo;
758  PDG_WorkItemIDSet myPausedItems;
759 
760  WorkItemSet myRunningItems;
761  WorkItemSet myFailedItems;
762  WorkItemSet myLongRunningItems;
763 
764  WorkItemSet myCanceledItems;
765  NodeSet myCanceledNodes;
766 
767  UT_StringHolder myName;
768  UT_StringHolder myLocalWorkingDir;
769  UT_StringHolder myLocalTempDir;
770  UT_StringHolder myLocalScriptDir;
771  UT_StringHolder myRemoteWorkingDir;
772  UT_StringHolder myRemoteTempDir;
773  UT_StringHolder myRemoteScriptDir;
774 
775  PDG_GraphContext* myContext;
776  const PDG_SchedulerType* myTypeObject;
777 
778  time_t myLastTick;
779  WorkItemDataSource myDataSource;
780  TempDirCleanup myTempDirCleanup;
781 
782  UT_TBBSpinLock myProcessLock;
783 
784  bool myCancelingFlag;
785  bool myPausingFlag;
786  bool myStaticCook;
787  bool myInProcess;
788 
789  bool myIsWaitForFailures;
790  bool myIsValidateOutputFiles;
791  bool myIsCheckExpectedOutputFiles;
792  bool myIsCompressWorkItemData;
793 };
794 
795 #endif /* __PDG_SCHEDULER_H__ */
exint PDG_WorkItemID
Type defs for unique work item IDs.
The work item was cooked, and should be marked as succeeded.
Definition: PDG_Scheduler.h:78
GLuint GLsizei const GLchar * message
Definition: glcorearb.h:2543
static const UT_StringHolder theTypeKey
The key for the type of the scheduler when writing it to JSON.
const UT_StringHolder & name() const
Returns the name of the shceudler.
virtual bool startService(UT_WorkBuffer &errors, PDG_Service *service)
Starts a service using this scheduler.
The scheduler is able to handle the specified work item.
GLsizei const GLfloat * value
Definition: glcorearb.h:824
PDGE_Dependency * isSetupDep()
Returns the is cooked dep for this scheduler.
Comparator< PriorityComparator, Reverse > Priority
Functor that compares two work item references or pointers by priority.
#define PDG_API
Definition: PDG_API.h:23
GLsizei const GLchar *const * path
Definition: glcorearb.h:3341
The scheduler is not able to handle the specified work item.
int64 exint
Definition: SYS_Types.h:125
SYS_FORCE_INLINE const char * buffer() const
The call to schedule the work item succeeded.
Definition: PDG_Scheduler.h:84
The temp file dir is not cleaned up automatically.
The work item was cooked, and should be marked as failed.
Definition: PDG_Scheduler.h:75
Class which writes ASCII or binary JSON streams.
Definition: UT_JSONWriter.h:37
**But if you need a result
Definition: thread.h:622
bool evaluateOverride(T &result, PDG_NodeInterface *node, const UT_StringHolder &prefix, const UT_StringHolder &parm, PDG_WorkItem *work_item, const D &default_value, UT_WorkBuffer &errors) const
virtual void addWarning(const UT_StringHolder &message) const
Adds a warning to the node interface – implemented in subclasses.
static const UT_StringHolder theParametersKey
The key for the parameters of the scheduler when writing them to JSON.
PDG_WorkItemEvalState
virtual bool reloadInstance(UT_WorkBuffer &errors)
The call to schedule the work item failed.
Definition: PDG_Scheduler.h:81
virtual PDGN::PDGN_PollingClientNNG * getPollingClient()
GLint GLint GLsizei GLint GLenum GLenum type
Definition: glcorearb.h:108
int64 Hash
The file hash/modtime type.
Definition: PDG_File.h:39
exint length() const
The work item was cooked, and should be marked as canceled.
Definition: PDG_Scheduler.h:72
bool isBatch() const
Returns true if the work tiem is a batch.
PDG_WorkItemState
Enum of possible work item states.
PDG_FileTransferType
const UT_StringHolder & templateName() const
virtual void addError(const UT_StringHolder &message) const
Adds an error to the node interface – implemented in subclasses.
long long int64
Definition: SYS_Types.h:116
tbb::concurrent_hash_map< K, T, H, A > UT_ConcurrentHashMap
virtual int64 getMemoryUsage(bool inclusive) const
Returns the memory usage of this owner instance.
PDG_PathMappingMode
Enumeration of path mapping modes available on the scheduler.
GLuint const GLchar * name
Definition: glcorearb.h:786
virtual const PDG_NodeInterface * templateInterface() const
PDG_WorkItemLogType
Enumeration of work item log message types.
size_t format(const char *fmt, const Args &...args)
TransferType
Enumeration of file transfer destination paths.
Definition: PDG_FileUtils.h:84
PDG_Port * parameter(const UT_StringHolder &name, int multi=-1) const
virtual bool stopService(UT_WorkBuffer &errors, PDG_Service *service)
Stops a service that was started with this scheduler.
fpreal64 fpreal
Definition: SYS_Types.h:278
The scheduler cannot handle the work item at this time.
Definition: PDG_Scheduler.h:87
GLuint index
Definition: glcorearb.h:786
The scheduler is busy, and cannot process additional work items.
SYS_FORCE_INLINE void append(char character)
UT_StringHolder debugName() const override
Debug name method, required by PDGE_DependencyOwner.
The scheduler is ready to accept work items.
virtual PDGE_Dependency::State evalResolve(PDGE_Resolutions &, const PDGE_Evaluator &, PDGE_Dependency *)
Called when a dependency owned by this object is resolved.
Class to store JSON objects as C++ objects.
Definition: UT_JSONValue.h:99
void pause(int delay) noexcept
Definition: thread.h:103
bool process(T &func, UT_WorkBuffer &fullpath, exint fullpath_len, const UT_StringArray &paths, const UT_Array< FS_Stat > &stats)
Utility function to process the contents of the traverse() function.
Definition: FS_Traverse.h:24
#define UT_ConcurrentPriorityQueue
bool evaluate(int index, fpreal &result, const PDG_WorkItem *work_item, UT_WorkBuffer &errors) const
PDGE_Dependency * isCookedDep()
Returns the is cooked dep for this scheduler.
virtual void resetOwner()
Resets the owner.
static const UT_StringHolder theNameKey
The key for the name of the scheduler when writing it to JSON.
tbb::concurrent_unordered_set< K, H, P, A > UT_ConcurrentSet
GLint GLsizei count
Definition: glcorearb.h:405
SYS_FORCE_INLINE bool isstring() const
virtual const PDG_EventFilter & supportedEventTypes() const =0
Returns the list of supported event types for this emitter.