On the local scheduler,
self.run_list contains the list of active work items. During
onSchedule the work item is spawned using
subprocess.Popen, which is non-blocking, and added to
self.run_list. During
onTick the scheduler iterates over all of the entries in the running item list, and calls
poll(..) on each process to check the status of that process.
Here is a very basic example of how you might implement in that using the Python Scheduler:
onSchedule:
import subprocess
self.createJobDirsAndSerializeWorkItems(work_item)
item_command = self.expandCommandTokens(work_item.command, work_item)
proc = subprocess.Popen(item_command, shell=True)
if not hasattr(self, "__runlist"):
self.__runlist = []
self.__runlist.append((proc, work_item.id))
self.workItemStartCook(work_item.id, -1)
print("Starting {}".format(work_item.id))
return pdg.scheduleResult.Succeeded
onTickif hasattr(self, "__runlist"):
for entry in self.__runlist:
exit_code = entry[0].poll()
if exit_code is not None:
self.__runlist.remove(entry)
print("Done {} with status {}".format(entry[1], exit_code))
if exit_code == 0:
self.workItemSucceeded(entry[1], -1, 0)
else:
self.workItemFailed(entry[1], -1, 0)
This simple example has no limit on the number of work items that can run at a time. If you want to limit them, your scheduling code needs to check the number of active work items and return
pdg.scheduleResult.Deferred or
pdg.scheduleResult.FullDeferred if it wishes to defer available work items until later. It also does not handle batch work items, configuring the job environment, etc.