Python Scheduler's number of concurrent operations

   945   7   0
User Avatar
Member
15 posts
Joined: Dec. 2019
Offline
Hi, I'm trying to figure out how to set the Python scheduler to do more than one work item at a time. This seems like it would either be default behavior or have easy access. But after trying to find different spare parameters, adding them and adjusting them (no effect). As well as combing the documentation for the pdg scheduler I'm still confused or at a loss.

The idea would be that we'd be submitting these processes to the cloud, so we'd want as many as possible. But I'm trying to test locally with a python scheduler to get it up and running first.

Here it shows that it's executing one at a time. Any idea how to get say, 3 at a time?
Edited by BitreelTD - April 7, 2022 17:44:14

Attachments:
python scheduler 1 at time.jpg (409.6 KB)

User Avatar
Staff
543 posts
Joined: May 2014
Offline
Your onSchedule implementation needs to return immediately -- it can't wait for the item to cook. It should instead submit the work item to whatever will be cooking it, and then return immediately.

The way it works for the local scheduler, for example, is the onSchedule function spawns a child process for the work item, and then returns as soon as the API call to start the process completes. The scheduler's onTick callback then checks the status of all actively running work item processes. For ones that have completed it marks the work item as succeeded/failed based on the return code of the process. It ignores processes that are still running, unless they've hit the run limit in which case it kills them.

The Python code for the local scheduler is available in $HFS/houdini/pdg/types/schedulers/local.py for reference. Note that local scheduler stores a list of actively running processes as a member variable on itself. Also note that the local scheduler uses subprocess.Popen to spawn the process, without waiting on it to complete, and uses process.poll(..) to poll the status of a running process at a later point.

With farm schedulers like HQ or Tractor, the onSchedule method works in the same way. It makes the appropriate farm scheduler API call to create a new job, and returns as soon as that job is accepted by the farm system. It doesn't wait for the job to cook. When the job finishes, typically the farm job itself notifies the scheduler over RPC that the work item has finished -- for example with HQ's per-job Success of Fail callbacks.
Edited by tpetrick - April 7, 2022 18:23:33
User Avatar
Member
15 posts
Joined: Dec. 2019
Offline
Thanks for the reply. I think we're in a clear understanding of what needs to happen. There isn't a simple working example of this submit and tick behavior is there? I know that the local scheduler is doing it, but it's a lot to wade through considering we have a functional result already, just not concurrent.
User Avatar
Member
15 posts
Joined: Dec. 2019
Offline
tpetrick
stores a list of actively running processes as a member variable on itself

Could anyone point me to where in the code this is happening?
User Avatar
Staff
543 posts
Joined: May 2014
Offline
On the local scheduler, self.run_list contains the list of active work items. During onSchedule the work item is spawned using subprocess.Popen, which is non-blocking, and added to self.run_list. During onTick the scheduler iterates over all of the entries in the running item list, and calls poll(..) on each process to check the status of that process.

Here is a very basic example of how you might implement in that using the Python Scheduler:

onSchedule:
import subprocess

self.createJobDirsAndSerializeWorkItems(work_item)

item_command = self.expandCommandTokens(work_item.command, work_item)
proc = subprocess.Popen(item_command, shell=True)

if not hasattr(self, "__runlist"):
    self.__runlist = []
    
self.__runlist.append((proc, work_item.id))
self.workItemStartCook(work_item.id, -1)

print("Starting {}".format(work_item.id))

return pdg.scheduleResult.Succeeded

onTick
if hasattr(self, "__runlist"):
    for entry in self.__runlist:
        exit_code = entry[0].poll()
        if exit_code is not None:
            self.__runlist.remove(entry)
            print("Done {} with status {}".format(entry[1], exit_code))
            if exit_code == 0:
                self.workItemSucceeded(entry[1], -1, 0)
            else:
                self.workItemFailed(entry[1], -1, 0)

This simple example has no limit on the number of work items that can run at a time. If you want to limit them, your scheduling code needs to check the number of active work items and return pdg.scheduleResult.Deferred or pdg.scheduleResult.FullDeferred if it wishes to defer available work items until later. It also does not handle batch work items, configuring the job environment, etc.
Edited by tpetrick - April 12, 2022 11:32:23
User Avatar
Member
15 posts
Joined: Dec. 2019
Offline
Thanks you that's incredibly helpful. I'm working through it and testing it out.

This might be an easy thing, but being very rusty and a bit unfamiliar with the subprocess (I'm not a developer, trying to digest this for the developers). I'm wondering why I'm getting exit code 127

Attachments:
exit_code.jpg (311.8 KB)

User Avatar
Staff
543 posts
Joined: May 2014
Offline
That's most likely because HDA Processor relies on a number of environment variables, such as $PDG_ITEM_ID which it uses to load the work item's JSON data file when running out of process. The example snippet I pasted is the bare minimum to run work items -- it doesn't set up the job environment for example, so it can't run work items that depend on that. You'd need to do something like the following when spawning the process:

job_env = os.environ.copy()
job_env['PDG_RESULT_SERVER'] = str(self.workItemResultServerAddr())
job_env['PDG_ITEM_NAME'] = str(work_item.name)
job_env['PDG_ITEM_ID'] = str(work_item.id)
job_env['PDG_DIR'] = str(self.workingDir(False))
job_env['PDG_TEMP'] = temp_dir
job_env['PDG_SCRIPTDIR'] = str(self.scriptDir(False))

# run the given command in a shell
proc = subprocess.Popen(item_command, shell=True, env=job_env)
User Avatar
Member
15 posts
Joined: Dec. 2019
Offline
After some testing I've managed to put this stuff together into a working prototype. Thanks very much for your help and support. This would have taken exponentially longer without it.

One thing I would suggest is to have these sorts of things in the official examples in the help documents. Anything to bridge the gap between developers who don't use Houdini and Houdini TD's that aren't dedicated Python programmers.
  • Quick Links