Last modified 9 years ago Last modified on 11/04/2009 03:58:29 PM

Revised Design for a Stage within the Pipeline Harness


The design presented in this document is response to the request that the Stage API be split over two classes: one that handles the serial part and one that handles the parallel part. This better reflects the fact that the serial processing happens in a completely different process (possibly on a different machine)--and, therefore, in a different class instance--as the parallel processing. Previously, these parts were captured in a single class; this leads to the confusing misconception that, for example, that internal data changes that occur when the serial preprocess() function is executed would be seen when the parallel process() is executed. When the serial and parallel parts are split across different classes, it is clearer that they work with independent internal data.

The initial design was reviewed during a Middleware WG telecon, 9/9/09 and changes were recommended. This document has been updated to incorporate those suggestions.

The Revised Classes

The following UML design has (as of this writing) been integrated into the EA UML model (see full model documentation for these classes in the attached PDF).

In this design, a stage developer would create subclass of SerialProcessing or ParallelProcessing or both, depending which kinds of processing the stage requires. These class names are provided to the policy defining the stage. A Pipeline instance could then instantiate the SerialProcessing class (or a do-nothing default if one is not provided), and each Slice instance would instantiate the ParallelProcessing class.

The StageProcessing constructor is inherited by SerialProcessing and ParallelProcessing. The constructor arguments include data that would normally be passed in by the harness (the Pipeline and Slice, respectively), but which are not necessarily required in a simple scripting context. The setup() function provides a simpler API for SerialProcessing and ParallelProcessing implementations to provide additional initialization without overriding the constructor. setup() is intended to be called as the the very last step of the construction, after all standard internal data have been set.

Specifying the Stage in The Pipeline Policy

As before a pipeline policy file would contain an array of stage definitions, given in the appStage parameter:

   appStage: {
      name:  Source Detection
      serialClass: "lsst.meas.pipeline.sourcedet.SerialPart"
      parallelClass: "lsst.meas.pipeline.sourcedet.ParallelPart"
      eventTopic: "None"
      stagePolicy: @IPSD/13-sourceDetection_policy.paf

Like the old stageName parameter, the serialClass and parallelClass parameters both take a fully qualified name for a class. If stage requires both a serial part and a parallel part, then both parameters must be provided; however, if the stage is purely parallel or serial, only the appropriate parameter needs to be specified.

Subclassing the Stage Classes

Typically when subclassing SerialProcessing or ParallelProcessing, a developer will:

  • for ParallelProcessing, provide a process() function, or for SerialProcessing, both preprocess() and postprocess()

The default implementation of these functions will raise RuntimeErrors. Thus, one must at least provide a no-op implementation.

  • optionally, provide a setup() implementation.

This will be called via the constructor after all internal execution context data has been set. In this function, the stage can be configured using self.policy, and any of the other internal data (like self.rank) can be accessed during this setup.

A SerialProcessing subclass would look something like this (in this example, assuming only a preprocess function will be used):

import lsst.pex.harness.stage as harnessStage

class mySerialPart(harnessStage.SerialProcessing):

    def setup(self):                     # provide a setup if you need to configure from policy
        self.datum = self.policy.get('datum')  

    def preprocess(self, clipboard):     # clipboard is now passed in
        im = clipboard.get("target")
        newdata = 1
        clipboard.put("newdata", newdata)

    def postprocess(self, clipboard):    # not needed, so provide no-op

A ParalleProcessing subclass is similar but must also accept the rank parameter:

import lsst.pex.harness.stage as harnessStage

class myParallelPart(harnessStage.ParallelProcessing):

    def setup(self):
        self.datum = self.policy.get('datum')  

    def process(self, clipboard):


Single Class for Serial and Parallel Parts of Processing

It is okay if the serial and parallel parts reside in the same subclass. While this practice is discouraged for new stage implementations, it is likely that legacy Stage subclasses will be upgraded in this way. It would require that the subclass inherit from both SerialProcessing and ParallelProcessing.

Common Implementations for SerialProcessing and ParallelProcessing

A developer may wish to have separate SerialProcessing and ParallelProcessing.subclasses share a common implementation for a member function. This could be done either via a delegate class that is held by both subclasses or via multiple inheritence, e.g.:

class MyStageBase(harnessStage.StageProcessing):
    def __init__(self, policy, runId, rank, stageId=-1, name=None):
        harnessStage.StageProcessing.__init__(self, runId, rank, stageId, name)

class MyParallelProcessing(harnessStage.ParallelProcessing, MyStageBase):
    def __init__(self, policy, runId, rank, stageId=-1, name=None):
        MyStageBase.__init__(self, runId, rank, stageId, name)

Alternative Queue Interaction Modes

As an ease to developers, it is no longer necessary to explicitly "get" and "put" a clipboard form/onto queues. In this new design, a single clipboard is passed into the stage. Thus, simply overriding process(), preprocess(), or postprocess() assumes that only one clipboard should be processed per stage execution. One can deviate from this assumption, by overriding applyProcess(), applyPreprocess(), and/or applyPostprocess().

For example, by overriding applyProcess(), a stage could processing all clipboards available on a queue before the stage iteration is complete. To do this, the new applyProcess() must:

  • retrieve each clipboard from the input queue explicitly
  • call process() for each clipboard, passing it in as the input parameter
  • put the updated clipboard onto the output queue, in order.

For SerialProcessing, clipboard handling is slightly different. The new applyPrepocess() should,

  • create a new intermediate Queue object
  • retrieve each clipboard from the input queue explicitly
  • call preprocess() for each clipboard, passing it in as the input parameter
  • put the updated clipboard onto the new intermediate queue, in order.
  • return the intermediate Queue object upon function exit.

By default, the applyPostprocess(), which receives the above mentioned intermediate queue as input, will simply for each clipboard in that queue, call postprocess() and then add the clipboard onto the output queue. Thus, it is usually not necessary to override applyPostprocess().