Last modified 10 years ago Last modified on 12/09/2009 06:29:14 PM

Creating a Stage Implementation

The instructions in this page for creating a Stage require pex_harness v3.4 or later.

In addition to the tips and examples given on this page, you will also benefit from the example that comes with the pex_harness package under examples/simpleStageTest. The file contains a complete, annotated implementation of a stage as well as a script for testing it using SimpleStageTester.

Stage File Locations for DC3b

Application functions that execute science algorithms should go into appropriate packages, including ip_utils and meas_utils if no more specific package is appropriate. The code should go into the python subdirectory of the package, and any dictionary used by the code should go into the policy subdirectory of the package.

Pipeline stage wrappers for the application functions should be thin, primarily focused on clipboard manipulation, and should live in ip_pipeline and meas_pipeline. Dictionaries for the stages should go into the policy subdirectory of those packages. You will likely need to copy dictionaries from the application function package policy subdirectory in order to make this work.

Any policy overrides for the stage dictionaries, including those needed to specify clipboard keys used to hook together the pipelines, should go into the datarel package.

Stage API - Updated for DC3b and Beyond

The updated Stage API introduces separate classes to encapsulate the serial and parallel components of the pipeline processing for a given modular stage. We define an abstract class lsst.pex.harness.stage.StageProcessing and introduce two classes lsst.pex.harness.stage.SerialProcessing and lsst.pex.harness.stage.ParallelProcessing that subclass the abstract class and manage the serial and parallel parts of stage processing, respectively.

A Stage developer writes a class that extends SerialProcessing if work for that Stage needs to be done within the single serial Pipeline, overwriting either or both of the preprocess and postprocess methods. The clipboard that carries data from incoming events or previous stages is passed as an argument to the preprocess / postprocess method. With the inclusion of the Clipboard instance as an argument to the functions, Clipboard management is greatly simplified and it is no longer necessary for the Stage developer to pull the instance from the InputQueue or add it to the OutputQueue.

import lsst.pex.harness.stage as harnessStage

class SampleStageSerial(harnessStage.SerialProcessing):

    def preprocess(self, clipboard):
        Processing code for this Stage to be executed by the main Pipeline prior to invoking Slice process 

    def postprocess(self, clipboard):
        Processing code for this Stage to be executed by the main Pipeline after invoking Slice process 

The developer writes a class that extends ParallelProcessing if computation for that Stage needs to be done within the parallel Slice(s), within the process method. The clipboard that carries input/output data is passed as an argument to the process method.

import lsst.pex.harness.stage as harnessStage

class SampleStageParallel(harnessStage.ParallelProcessing):

    def process(self, clipboard):
        Processing code for this Stage to be executed within a Slice 

Finally, you may optionally create a Stage class. This is makes testing (and interactive execution) convenient but it is not required for running your stage in a full production pipeline. Here's what it might look like assuming the stage has both serial and parallel components:

class SampleStage(harnessStage.Stage):
    serialClass = SampleStageSerial
    parallelClass = SampleStageParallel

If it only has a parallel part, it's even easier:

class SampleStage(harnessStage.Stage):
    parallelClass = SampleStageParallel

What is this for, you may ask? For testing with SimpleStageTester; see section on Testing below.

Specifying a Stage class in Pipeline Policy

See also how to pass a Policy to your Stage for internal configuration

If a Stage requires both serial and parallel processing, then two application subclasses of lsst.pex.harness.stage.StageProcessing should be created. In the current example these are called SampleStageSerial (which inherits from lsst.pex.harness.stage.SerialProcessing ) and SampleStageParallel (which inherits from lsst.pex.harness.stage.ParallelProcessing). The class names of the application stages are entered into the Pipeline policy within an appStage subpolicy under the attributes serialClass and parallelClass , respectively:

appStage: {
     name: "SampleStage"
     serialClass: "lsst.pexhexamples.pipeline.SampleStageSerial"
     parallelClass: "lsst.pexhexamples.pipeline.SampleStageParallel"
     eventTopic: "None"
     stagePolicy: @policy/samplestage.paf

If a Stage only requires, for example, parallel processing, then only that application stage class needs to be entered into the policy:

appStage: {
     name: "SampleStage"
     parallelClass: "lsst.pexhexamples.pipeline.SampleStageParallel"
     eventTopic: "None"
     stagePolicy: @policy/samplestage.paf

In this case the pipeline harness provides an instance of a special (and trivial) "no operation" class lsst.pex.harness.stage.NoOpSerialProcessing to serve as a placeholder for the serialClass for this Stage. Analogously, for the case of no parallel processing by the Stage, an instance of lsst.pex.harness.stage.NoOpParallelProcessing is provided for parallelClass by the harness.

Initializing your stage in setup()

1. setup() is called by the harness

In lieu of overwriting stage __init__ methods, within the updated Stage API a setup method is made available to performed required initializations to set up the internal state of the stage. This method will be invoked at the end of the StageProcessing initialization if it is present. An example use for the setup() would be to retrieve values from the stage policy:

class SampleStageSerial(harnessStage.SerialProcessing):

    def setup(self):
        # self.policy is the predefined instance of the Stage policy 
        if self.policy.exists('RunMode'):
            self.runMode = self.policy.getString('RunMode')

The harness makes the stage policy available automatically via the instance self.policy. Other attributes made available to the application stage by default include self.rank, self.stageId, self.runId, and self.universeSize.

2. Loading default settings for your stage

We are now encouraging that policy data be loaded primarily from a default policy file which is stored under the installation directory of the home package for the stage. The policy passed in via the framework, then, is fairly "thin" with settings that are specific to its use in a pipeline. The setup() function is the place to load default policy data for the stage. Here is the pattern you should use:

    def setup(self):
        # this assumes that the default dictionary is in $MYPACKAGE_DIR/policy/mystage_dict.paf
        policyFile = pexPolicy.DefaultPolicyFile("mypackage", "mystage_dict.paf", "policy")

        defPolicy = pexPolicy.Policy.createPolicy(policyFile, policyFile.getRepositoryPath(), True)
        if self.policy is None:
            self.policy = Policy()

This same pattern can be used within other classes used by the stage that are configured by a policy. See PolicyHowto for more information.

Logging within a Stage

The new API now provides you with a Log instance to use to log messages; it is accessible via self.log. Unlike in the past, it is quite important to use this connected with your Stage rather than creating a new one. The reason is your stage shares that log with the entire pipeline. Previous stages may have attached properties (like a visit name) that would be useful to have recorded with your message. Here are some examples:

from lsst.pex.logging import Log, Rec, Prop, Debug

       self.log.log(Log.INFO, "%d sources found" % sourceCount)
       self.log.log(Log.WARN, "Number of sources is suspiciously low")

For more examples see the LoggingHowto and in pex_logging's examples/

You can create child logs from self.log as you please:

       mylog = Log(self.log, "kernelBuilder")
       mylog.log(Log.DEBUG, "Starting kernel building...")

If you are used to writing a log of trace/debug messages, you can create get some syntactic sugar from the Debug Log:

       self.log = Debug(self.log, "MyStage")
       self.log.debug(3, "kernel complete")

Testing Your Stage Implementation

SimpleStageTester is your class for testing. It is design for testing your stage in a single slice or as the master pipeline process. It cannot test stages that require multiple iterations (see #1069) or cross-slice communication. Here's an example of how you might run it:

    stagePolicy = pexPolicy.Policy()   # we will rely on default policy loaded internally in the stage
    tester = SimpleStageTester( Sampletage(stagePolicy) )

    # set the verbosity of the logger.  If the level is at least 5, you
    # will see debugging messages from the SimpleStageTester wrapper.

    # create a simple dictionary with the data expected to be on the
    # stage's input clipboard.  If this includes images, you will need to 
    # read in and create the image objects yourself.
    clipboard = dict( width=1.0, height=2.0 )

    # you can either test the stage as part of a Master slice (which runs
    # its preprocess() and postprocess() functions)...
    outMaster = tester.runMaster(clipboard)

    # ...or you can test it as part of a Worker.  Note that in the current
    # implementation, the output clipboard is the same instance as the input
    # clipboard.  
    clipboard = dict( width=1.0, height=2.0 )
    outWorker = tester.runWorker(clipboard)

    # check the results on the outWorker clipboard.  

See examples/simpleStageTest for more details.

SimpleStageTester can also run a sequence of stages (one thread at a time). Just use its addStage() function to add more stages after an initial one.

IOStage configuration within a Pipeline

For the common use case where IOStage is used to place input data onto the clipboards for parallel Slice workers, the IOStage can be configured:

appStage: {
     name: "InputStage"
     parallelClass: "lsst.pex.harness.IOStage.InputStageParallel"
     eventTopic: "triggerImageprocEvent"
     stagePolicy: @policy/input_policy.paf

The class lsst.pex.harness.IOStage.InputStageParallel of the pex_harness extends lsst.pex.harness.stage.ParallelProcessing and performs the data retrieval via the persistence layer as specified by the input stage policy. Output of data from a Slice's clipboard is similarly handled by lsst.pex.harness.IOStage.OutputStageParallel , with a sample entry within pipeline policy taking the form:

appStage: {
     name: "OutputStage"
     parallelClass: "lsst.pex.harness.IOStage.OutputStageParallel"
     eventTopic: "None"
     stagePolicy: @policy/output_policy.paf

SymLinkStage configuration within a Pipeline

The setup of local links in the pipeline run directory to locations for data input, output, working scratch space, etc is conformable to a serial operation. Hence we define within the pex_harness a SymLinkStageSerial to perform this task for pipelines, which can be configured using:

appStage: {
    name: "SymLinkStage"
    serialClass: "lsst.pex.harness.SymLinkStage.SymLinkStageSerial"
    eventTopic: "None"
    stagePolicy: @policy/symlink_stage.paf

Deprecated API: DC3a and prior

This page examines some issues and topics related to creating an application Stage implementation that inherits from the generic lsst.dps.Stage class.

Application Stage Methods

The easiest path to implement an application Stage to be imported in to a Pipeline or Slice is to write it in Python. The application Stage should inherit the generic lsst.dps.Stage, and should implement the Stage API

  • preprocess()
  • process()
  • postprocess()

A short and simple Python application Stage may look like :

from lsst.dps.Stage import Stage 

class App1Stage(Stage): 

    def preprocess(self):
        print 'Python App1Stage preprocess : stageId %d' % self.stageId

    def process(self):
       print 'Python App1Stage process : stageId %d' % self.stageId
    def postprocess(self):
       print 'Python App1Stage postprocess : stageId %d' % self.stageId

The preprocess() and postprocess() are serial methods executed by the main Pipeline application.

Input Clipboard for the Stage

The application Stage will obtain its input parameters from a Clipboard object. The Clipboard is retrieved from the input Queue of the Stage by means of the Queue API method getNextDataset(). The initial Clipboard encountered by an application Stage will have a single name-value pair entry that has been set up by the middleware Pipeline/Slice?. Taking Slice as an example, at the beginning of its loop over Stages the Slice will execute a handleEvents() method that will wait for, receive, and process an incoming event that carries the parameters required by the first Stage:

    # This is middleware code: need not be implemented by Stage developers
    eventReceiver = events.EventReceiver(self.eventHost, self.eventTopic)

    print 'Python Slice handleEvents() - waiting on receive...
    inputDataPropertyPtrType = eventReceiver.receive(80000)
    print 'Python Slice handleEvents() - received event.

    key1 = "inputData"

    clipboard = Clipboard()
    clipboard.put(key1, inputDataPropertyPtrType)
    queue1 = self.queueList[0]

The key "inputData" will have as value the Python proxy for a DataProperty::PtrType. The required input parameters needed by the Stage are extracted from the DataProperty's referred to by this value. In this current picture, either 1) the developer will disassemble the DataProperty::PtrType proxy within the application Stage code or, in an effort to keep Stages more general, 2) the disassembling will occur an input Queue properly configured with a Policy.

The configuration of the first input Queue through a Policy ensures that the correct input parameters are placed on the Clipboard by the Pipeline/Slice?. The topic of the incoming event that the running Pipeline/Slice? should listen for and receive is specified in the Policy. The payload of the event is added to the Clipboard upon arrival and the Pipeline/Slice? proceeds to execute the Stage. This approach relies on some concensus as to what parameters are contained with events of a given topic (e.g., a "mopsevent" would contain FOVTime, FOVRA, FOVDEC, etc). With agreement on the contents of events of a given topic, the Pipeline/Slice? middleware need not delve into those details but rather serve strictly to forward the appropriate parameters to the first Stage.

The Stage methods executed by the main Pipeline (executing the serial portions of the processing) might naturally retrieve the Clipboard from the input Queue at the start of the preprocess() method and post the Clipboard to the output Queue at the end of the postprocess() method, positioning the Clipboard for the next Stage:

    def preprocess(self):
        self.dataClipboard = self.inputQueue.getNextDataset()

    def postprocess(self):

The Stage method process() executed by the parallel Slice workers might naturally retrieve the Clipboard from the input Queue at the start and post the Clipboard to the output Queue at the end, positioning the Clipboard for the next Stage:

    def process(self):
        self.dataClipboard = self.inputQueue.getNextDataset()

The Clipboards that move Stage by Stage through the main Pipeline and the individual Slices are currently independent, though within the UML model the concept of "sharing" data between these Clipboards does exist. We still need to determine whether this functionailty is needed for DC2.

Application Stage Policy Files

The application stage policy files for a Pipeline are listed within the main pipeline policy file. An example Pipeline policy file pipeline_policy.json might take the form:

    "appStages": ["lsst.dps.IOStage.InputStage", "lsst.apppkg.SampleStage", "lsst.dps.IOStage.OutputStage"],
    "eventTopics": ["None", "None", "None"],
    "stagePolicies": ["input_policy.paf", "None", "output_policy.paf"]

With the stage policy included in this manner, the application stage will possess a self._policy Policy object automatically, i.e., the Pipeline framework will construct Policy objects and invoke the Stage constructor with the Policy object as an argument, and the self._policy will be set.