wiki:RunPipelineHarness
Last modified 10 years ago Last modified on 03/09/2009 05:34:02 PM

Running the Pipeline Harness for DC3a testing

Testing an application Stage within a Pipeline: Template example

The template example demonstrates how to set up a simple Pipeline with a single Slice executing a single Stage. This example is found under pex_harness/examples/template/ .

The template example has a single application stage, SampleStage (which may be found in pex_harness/examples/stages/lsst/pexhexamples/pipeline.py). The full path to the directory pex_harness/examples/stages is added to the PYTHONPATH by the executing run.sh script, and consequently the stage may be loaded dynamically into the python context by the Pipeline. The SampleStage is a dummy application stage that performs a few simple tasks:

  • overwrites the preprocess(), process(), postprocess() methods of generic Stage,
  • retrieves the Clipboard from its inputQueue at the start of the process() method, and posts it to the outputQueue at the end, ushering the Clipboard on to the next Stage that a Slice will process,
  • retrieves the Clipboard from its inputQueue at the start of the preprocess() method, and posts it to the outputQueue at the end of postprocess(), passing the Clipboard to the next Stage for serial processing in the main Pipeline,
  • creates an instance of the Log,
  • retrieves elements from its stage policy file and write them to the Log.

As such, the SampleStage can serve as a useful template for building other application Stage that perform actual computation. The source for SampleStage :

from lsst.pex.harness.Stage import Stage
import lsst.pex.harness.Utils
from lsst.pex.logging import Log, LogRec
import lsst.daf.base as dafBase
from lsst.daf.base import *

class SampleStage(Stage):

    def preprocess(self):
        """
        Processing code for this Stage to be executed by the main Pipeline
        prior to invoking Slice process
        """
        self.activeClipboard = self.inputQueue.getNextDataset()

        root =  Log.getDefaultLog()
        log = Log(root, "lsst.pexhexamples.pipeline.SampleStage.preprocess")

        log.log(Log.INFO, 'SampleStage preprocess')

    def postprocess(self):
        """
        Processing code for this Stage to be executed by the main Pipeline
        after the completion of Slice process
        """
        root =  Log.getDefaultLog()

        log = Log(root, "lsst.pexhexamples.pipeline.SampleStage.postprocess")
        log.log(Log.INFO, 'SampleStage postprocess')

        self.outputQueue.addDataset(self.activeClipboard)

    def process(self):

        self.activeClipboard = self.inputQueue.getNextDataset()

        root =  Log.getDefaultLog()
        log = Log(root, "lsst.pexhexamples.pipeline.SampleStage.process")

        value ="None"
        if self._policy.exists('RunMode'):
            value = self._policy.getString('RunMode')

        lr = LogRec(log, Log.INFO)
        lr << " rank " + str(self._rank)
        lr << " stageId " + str(self.stageId)
        lr << " universeSize " + str(self._universeSize)
        lr << " RunMode from Policy " + value
        lr << LogRec.endr

        self.outputQueue.addDataset(self.activeClipboard)

By default this example will run with the Pipeline with one Slice on a single node. Edit the MPI machinefile "nodelist.scr" to specify the hostname of the current host.

The pipeline is then executed via

% ./run.sh template_policy.paf <some-run-id>

such as

% ./run.sh template_policy.paf test_1090

After execution the messages from the SampleStage process()

lsst.pexhexamples.pipeline.SampleStage.process:  rank 0
lsst.pexhexamples.pipeline.SampleStage.process:  stageId 1
lsst.pexhexamples.pipeline.SampleStage.process:  universeSize 2
lsst.pexhexamples.pipeline.SampleStage.process:  RunMode from Policy process

are located within Slice0.log, and the messages

...
lsst.pexhexamples.pipeline.SampleStage.preprocess: SampleStage preprocess
...
lsst.pexhexamples.pipeline.SampleStage.postprocess: SampleStage postprocess

are located within Pipeline.log.

Although this example does not use events, the events system does make use of an ActiveMQ broker during initialization. If an ActiveMQ broker ("eventHostBroker") other than the LSST default (lsst8.ncsa.uiuc.edu) is used, this needs to be specified in the pipeline policy file under "eventBrokerHost".

Running an example interSlice communication pipeline: examples/ring/

  1. This example is found under pex_harness/examples/ring/ . Change to this directory to find files
% ls
nodelist.scr  ring.paf  run.sh

The run.sh is a very simple launching script to test a Pipeline. (A more configurable mechanism for setup and launching of pipelines is developed within the dc2pipe/dc3pipe packages and the full orchestration layer.) At the top of the run.sh one finds input parameters that one can edit

nodes=1
nslices=5

These default parameters of a single node and five slices are sufficient to illustrate this ring topology example.

The MPI "machine file" is called nodelist.scr and contains the entry

lsst8.ncsa.uiuc.edu:6

The parameter value 6 that appears after the hostname specifies the total number of MPI processes on the host, and so equals the sum of the 1 Pipeline process and and the 5 Slices.

The pipeline can then be launched placing the policy file and a runid on the command line:

 ./run.sh ring.paf test_10001

To understand the function of this pipeline we can examine pieces of the ring.paf policy file :

localLogMode: true

shareDataOn: true

topology: {
    type: "ring"
    param1: "clockwise"
    # param1: "counterclockwise"
}

appStage: {
    stageName: "lsst.pexhexamples.apps.SyncSetupStage"
    eventTopic: "None"
    stagePolicy: "None"
    shareData: false
}

appStage: {
    stageName: "lsst.pexhexamples.apps.SyncTestStage"
    eventTopic: "None"
    stagePolicy: "None"
    shareData: true
}

InterSlice communication is enabled globally with shareDataOn: true and the topology is set to a ring with attribute clockwise. In this setup we have the Slices in a line: 0 1 2 3 4 with the ends connected, hence, Slice 0 will have neighbors 4 and 1, Slice 1 will have neighbors 0 and 2, etc. The communication will have the character that Slice 0 receives from Slice 4 and sends to Slice 1, Slice 1 receives from Slice 0 and sends to Slice 2, etc. Because the option localLogMode: true is set, after execution logs files Pipeline.log, Slice0.log, Slice1.log, ... will be found in the working directory.

The pipeline contains two Stages, SyncSetupStage and SyncTestStage. Within its process() method SyncSetupStage places a PropertySet on the Clipboard under the key "rankKey" and marks it as Shared:

        propertySet = dafBase.PropertySet()

        propertySet.setInt("sliceRank", self._rank)
        propertySet.setString("Level", "Debug")

        self.activeClipboard.put("rankKey", propertySet)

        self.activeClipboard.setShared("rankKey", True)

The next Stage SyncTestStage has the attribute shareData: true set, which ensures that all items marked shared on the Clipboard will be communicated prior to the execution of the process() method for this Stage. The received PropertySet is placed on the Clipboard under a key tagged with the rank of the neighbor Slice, e.g., rankKey-0 for the case where Slice 1 receives data from Slice 0.