Developer Notes

primrose has been developed with a relatively small, simple, and consistent interface with which to develop your own DAG nodes.

The core concept is that of a Node, a single node of a DAG. This is defined with the abstract class primrose.base.AbstractNode.

Some example custom nodes can be seen in the default project directory that’s created when running primrose create-project: AwesomeReader and AwesomeModel.

There are two defined methods:

necessary_config(node_config)

Almost all nodes will need some parameterization. A reader needs to from where to read, a model needs to know whether it running in training or eval mode, a writer needs to know where to write and so on.

One of the goals of the primrose framework is to catch configuration errors as early as possible, before the job actually starts. For this reason, we have implemented many configuration checks within the Configuration class (detailed here). One of those checks is to check that the nodes have all the information that they need to run. For this reason, we include a necessary_config(node_config) method in the Node interface.

If a CsvReader needs to know a filename so that it can perform its task, we explictly tell the Configuration object to look for a filename key in the section of configuration that defines that node.

To be explicit, if CsvReader returns set(['filename']) from CsvReader.necessary_config() then if the configuration file has

  "read_data": {
    "class": "CsvReader",
    "filename": "data/tennis.csv",
    "destinations": [
      "encode_and_split"
    ]
  }

Configuration knows that this is a “good” configuration and it passes its check.

All the parameters, i.e.

  {
    "class": "CsvReader",
    "filename": "data/tennis.csv",
    "destinations": [
      "encode_and_split"
    ]
  }

is avaliable to the Node class via self.node_config.

run(data_object)

The other method in AbstractNode is run(data_object). This is the method that implements a node’s functionality. It receives the data_object which is an instance of DataObject, and so has access to all the data from upstream nodes of the DAG. It performs it function, and can (but doesn’t have to) add or otherwise modify the data within DataObject and returns it.

The full interface for run(data_object) is

  """
      run the node. For a reader, that means read, for a writer that means write etc.

  Args:
      data_object (DataObject): DataObject instance

  Returns:
      (tuple): tuple containing:

          data_object (DataObject): instance of DataObject

          terminate (bool): terminate the DAG?

  """

and so it returns not only the data_object but also a flag to tell the DagRunner whether it should terminate the whole DAG. For instance, suppose that at the start of DAG, a reader is suppposed to read some data but the dataset it receives is empty. There may be no point doing any of the downstream nodes and so it can flag to stop the whole job by returning terminate=True

Complete example: sleep

Let’s suppose that you wanted to develop a new node whose job was to sleep for 5 seconds. We’ll parameterize and externalize the duration as a key in the config.

Your class (ignoring pydocs here for clarity) would look like:

from primrose.base.node import AbstractNode
import time

class SleepNode(AbstractNode):

  @staticmethod
  def necessary_config(node_config):
     return set(['duration'])

  def run(self, data_object):
      time.sleep(self.node_config['duration'])
      terminate = False
      return data_object, terminate

and that’s it. The primrose framework takes care of the rest. For better code, you might want to add some checks that duration is numeric, that the duration value is non-negative and so on but you get the idea.

After registering your class (see next section), you can then use it in a configuration files such as as


  "sleeptime": {
    "class": "SleepNode",
    "duration": 4.5,
    "destinations": [
      "some_other_node"
    ]
  }

Registering Your Classes

Now that you’ve written your own classes that implement the Node interface, how do you register them given that these nodes are in your project and you are importing primrose?

If running primrose with a configuration file, all you need to do is set the environment variable PRIMROSE_EXT_NODE_PACKAGE.

When configuration is validated, there is a check to make sure that all classes implement AbstractNode. If a class is not already registered, there is an attempt to register by searching through the given PRIMROSE_EXT_NODE_PACKAGE for the class name. If there is a match, the class is automatically imported and registered. The variable PRIMROSE_EXT_NODE_PACKAGE can be set to a package installed in pip (e.g. my_nodes), a python file (e.g. src/my_nodes/node1.py), or a directory to a package (e.g. src/my_nodes). If you would rather specify the package in your configuration file, you can set the key class_package in the metadata section. If both values are set, PRIMROSE_EXT_NODE_PACKAGE will take precedence.

If you want to specify a specific prefix for a class, you can set the value class_prefix in your node config. This class prefix will be appended to the set PRIMROSE_EXT_NODE_PACKAGE. This can either be specified in python dot notation (e.g. src.mynodes) or a path src/mynodes.py. This could be useful if you are importing nodes from multiple packages or from multiple locations.

Here is an example of how your configuration may look if your nodes are in the path src/mynodes/awesome_node.py. In the first method, you can just specify PRIMROSE_EXT_NODE_PACKAGE=src and primrose will find your custom node:

  implementation_config: {
  ...
    read_data: {
      "class": "MyAwesomeNode",
      "destinations": [
        "encode_and_split"
      ]
    }
  ...
  }

Alternatively, you can set the class_package and/or class_prefix variables to explicitly define a class location:

  metadata: {
    class_package: "src"
  },
  implementation_config: {
  ...
    read_data: {
      "class": "MyAwesomeNode",
      "class_prefix": "mynodes/awesome_node.py",
      "destinations": [
        "encode_and_split"
      ]
    }
  ...
  }

Conditional Pathing

During machine learning jobs, one often has to make decisions dynamically depending on characteristics of the data at runtime: if there is drift, then retrain the model; if the data is too large to fit in RAM, handle in the cloud; if the detected language is French, use the French model etc.

One option is to bake this conditional logic within a node. However, another option supported in primrose is to have conditional paths in the DAG. That is, if some condition is met at a given node, the DAGRunner should follow one or more of the destinations (and their subgraphs) and should ignore one or more other destinations (and their subgraphs). This is more easily explained with a diagram.

In this simple DAG, the reader flows into a node called conditional_node. This flows into two destinations: left and right. This conditional_node could flow into both left and right, as normal and as would be expected. However, in this case, it could signal to the DAGRunner that only the left path should run and the right path (right and right2) should be dropped from the DAG. It could also signal that only the right path should run and the left path should be dropped.

How does one develop such nodes? One of the primrose.base node types is AbstractConditionalPath. These nodes extend AbstractNode and so they must implement run(self, data_object). However, they must also implement a method destinations_to_prune(self). If the node does not want to prune a path, it returns None. However, if it does want to prune a path it returns a list of one or more of its destinations nodes. In the example above, it could return ['left'], ['right'] or ['left','right'].

For any AbstractConditionalPath-type node, the DAGRunner will call the node’s destinations_to_prune() after run(self, data_object). Thus, it has access to the data_object and any processing during the run method with which to make the pruning decision.

Note: as these decisions are made dynamically at run time, the DRY_RUN feature of primrose is not able to demonstrate the precise final DAG that will be run. DRY_RUN has to assume that no DAG pruning will occur.

Next

Learn more about DataObject: DataObject.