Node Callbacks ¶
The main callback method for partitioner nodes is onPartition, which receives the list of upstream work items as an input and a factory object for constructing partitions. The callback function is expected to return a pdg.result value that indicates that status of the partitioning operation.
onPartition(self, partition_holder, work_items) → pdg.result
This callback is evaluated once for each partitioner during the cook of a PDG graph, or once for each unique attribute value (if Split by Attribute is turned on). If the partitioner is static, the callback is run during the static pre-pass. Otherwise, it is evaluated during the cook after all input work items have been generated. The list of upstream work items eligible for partitioning is passed to the function through the work_items argument. The partition_holder argument is an instance of the pdg.PartitionHolder class and is used to create partitions.
Each partition is defined using a unique numeric value supplied by the onPartition function. Work items are added by calling the addItemToPartition function with the work item itself and the partition number:
# Add each work item to its own unique partition partition_holder.addItemToPartition(work_items[0], 0) partition_holder.addItemToPartition(work_items[1], 1) # Add both work items to a third, common partition partition_holder.addItemToPartition(work_items[0], 2) partition_holder.addItemToPartition(work_items[1], 2)
You can add a work item to multiple partitions or none of the partitions. Sometimes a node may wish to add a work item to all partitions before it knows how many partitions will be created. The addItemToAllPartitions method marks a work item as belonging to all partitions and includes ones that are added after that call is made.
You can also mark a work item as a requirement for the partition. If that work item is deleted, the entire partition is also deleted even if other work items in the partition still exist. For example, the Partition by Combination uses this behavior when creating partitions from pairs of upstream work items. If one of the work items in a pairing is deleted, the partition is no longer valid because it no longer represents a pair.
The following code is a possible implementation of an onPartition function that forms a partition for each unique pair of input work items:
def onPartition(self, partition_holder, work_items): partition_index=0 # Outer loop over the work items for index1, item1 in enumerate(work_items): # Inner loop over the work items for index2, item2 in enumerate(work_items): # We want to have only one partition for each pair, no matter what # the order. If we don't have this check we'll get a partition for # both (a,b) and for (b,a). if index2 <= index1: continue # Add both items to the next available partition, and flag the items # as required partition_holder.addItemToPartition(item1, partition_index, True) partition_holder.addItemToPartition(item2, partition_index, True) partition_index += 1 return pdg.result.Success
You can also implement the optional onPostPartition(self, partition, partition_items) method to run custom logic for each partition after the partitioning step finishes. The following is an example implementation that creates an array attribute on each partition with the list of work item IDs inside the partition:
def onPostPartition(self, partition, partition_items): item_ids = [item.id for item in partition_items] partition.setIntAttrib("contents", item_ids) return pdg.result.Success