Changes between Version 9 and Version 10 of PipelineFramework


Ignore:
Timestamp:
08/15/2007 03:55:11 PM (12 years ago)
Author:
RayPlante
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • PipelineFramework

    v9 v10  
    2828      3. a serial post-processing or clean-up step.  A common use of this step would be to record the ''provenance'' (the metadata describing what was done) of the stage. 
    2929 
    30 Another concept to come out of this modeling effort is the notion of a ''slice''.  An important concern for optimizing a pipeline is minimizing I/O costs.  One way to save on I/O costs is to keep data that is used by multiple stages in memory.  For example, the image processing pipeline might be parallelized by processing each CCD from the focal plane array on a different processor.  On one particular node, a CCD image can be kept in memory for as long as it is needed by the chain of pipeline stages.  It does require, then, that all stages of the pipeline will run on a particular node and operate on the same CCD image.   
    31 A ''slice'' of the pipeline, therefore, is a set of stage instances (running inside a worker process) that will operate on the same data-parallel chunk of data.   
     30Another concept to come out of this modeling effort is the notion of a ''slice''.  An important concern for optimizing a pipeline is minimizing I/O costs.  One way to save on I/O costs is to keep data that is used by multiple stages in memory.  For example, the image processing pipeline might be parallelized by processing each CCD from the focal plane array on a different processor.  On one particular node, a CCD image can be kept in memory for as long as it is needed by the chain of pipeline stages.  This implies then that a single processor will host a sequence of stages that operate on the same CCD (or set of CCDs).  A ''slice'' of the pipeline, therefore, is that set of stage instances (running inside a worker process) that will operate on the same data-parallel chunk of data.   
    3231 
    3332== The Components of a Pipeline == 
     
    8180Note that there is no provision in the model (as of yet) that allows parallel Slices to share data with each other (other than via a combined gather-scatter operation described above) as this breaks the data-parallel processing model and require implementing the MPI communication patterns.  If this is necessary, a Stage implementation would be given access to the MPI capabilities more directly. 
    8281 
    83 Pipeline and Slice objects will also be responsible for taking any data remaining on a Clipboard when it arrives on the !OutputQueue of the last stage and persisting it according to the Pipeline's policy.  (In general, data on a clipboard could be persisted between any of the stages.)  Some of the data on Clipboard will be destroyed while other data can remain to be available on the !InputQueue of the first stage again.  In this way, the Clipboard can keep some information from previous traverses through the Pipeline. 
     82Pipeline and Slice objects are responsible for initializing the Clipboard at the beginning of the Pipeline.  They are also responsible for taking any data remaining on a Clipboard when it arrives on the !OutputQueue of the last stage and persisting it according to the Pipeline's policy.  (In general, data on a clipboard could be persisted between any of the stages.)  Some of the data on Clipboard will be destroyed while other data can remain to be available on the !InputQueue of the first stage again.  In this way, the Clipboard can keep some information from previous traverses through the Pipeline.  The work of cleaning up a Clipboard at the end of a pipeline and its re-initialization for use at the beginning will be delegated to a special implementation of a Queue.  In particular, the special Queue implementation that will handle the end points of the Nightly Processing Pipeline will be responsibility for receiving the new images from the telescope, instantiating them as Image objects and placing them on the Clipboard as input to the first stage.  How the data is received is not specified in the framwork design but is rather encapsulated in the special Queue implementation (it may monitor some directory or it may respond to an event that points it to new data on disk). 
    8483 
    8584Thus, we envision the Clipboard as an object that is initialized for the first stage and then travels through stages (via the Queues), picking up new data items along the way.  At the end of the pipeline, output products are saved and the Clipboard is re-initialized for the next set of data to be processed.   
     
    9594== The Full Pipeline Sequence == 
    9695 
    97 Now that we have discussed how the pipeline framework manages the data flow, it is worthwhile to revisit the operation sequence presented above. 
     96Now that we have discussed how the pipeline framework manages the data flow, it is worthwhile to revisit the operation sequence presented in [#TheComponentsofthePipeline the introduction to the pipeline components above].   
     97 
     98   1. The Pipeline and the Slice objects are created and configured according to a given Policy.  As part of the configuration, the Pipeline notes the events that it needs to receive and when.  The Pipeline initializes a stage position counter, ''N'', that indicates which Stage is to be executed next, setting it to first stage. 
     99   1. The Pipeline object tells the Queue between stage ''N'' and ''N-1'' to transfer the Clipboard from the output side to the input side.  For the first stage (when the ''N=1''), this means initializing the Clipboard with the new data for the Pipeline. 
     100   1. The Pipeline object receives any events it is configured to listen for prior to the execution of the stage ''N''.  If so configured, it waits until that event has been received.  Upon receipt, it places any data contained in it on the Clipboard for stage ''N''.   
     101   1. The Pipeline object calls the `preprocess()` on its instance of the Stage ''N''. 
     102   1. After the `preprocess()` is complete, the Pipeline object inspects the Clipboard on the Stage's !OutputQueue for sharable data.  It then sends an MPI scatter message out to all of the worker processes telling them to execute the parallel portion of stage N; the message is followed by a serialization of any sharable data.  
     103   1. The Slice object in each worker process receives the message.  It tells the Queue between stage ''N'' and ''N-1'' to transfer the Clipboard from the output side to the input side.  It then unserializes any sharable data items it receives from the master process and places them on the !InputQueue for the stage ''N'' (with the sharable tag turned off).  It then calls the `process()` method on its instance of the Stage ''N'' object.   
     104   1. After the `process()` method completes, each Slice object inspects the Clipboard on the Stage's !OutputQueue for sharable data.  It sends an MPI gather message back to the master process indicating that the parallel processing is done; the message is followed by a serialization of the any sharable data. 
     105   1. When the Pipeline object has received the messages from all the worker nodes, it aggregates any received sharable data items and places them back on the master Clipboard and posts it to the !InputQueue of stage N.  It then executes the `postprocess()` step stage ''N''.   
     106   1. When the `postprocess()` is done, the Pipeline object can repeat this sequence beginning with step 2 for stage ''N+1'', and so on, until the entire chain of stages is complete.   
     107   1. When the `postprocess()` of the last stage is done, the Pipeline resets its stage position counter ''N'' to the first stage, and starts the sequence again (at step 2) beginning with the first stage for the next set of available input data.  Encapsulated in the transfer of the Clipboard from the end of the pipeline to the start is the persisting of any data items left on the Clipboard, according to the Pipeline policy.   
    98108 
    99109== Configuring a Pipeline and its Components == 
    100110 
     111A Policy Object will be used to configure the Pipeline.  As described in PolicyDesign, the Pipeline Policy will contain the configuration for the Pipeline as a whole as well as for all the component Stages and the components internal to the Stages; this information is organized into a hierarchical tree.  It is a function of ''Pipeline Construction'' to assemble the comprehensive Policy out of default policies for each of the Stages; that is the pipeline construction system would pull default policies as flat files from a policy library, override their paramters as desired, and assemble them into one comprehensive policy file.  It is a function of ''Pipeline Orchestration'' to deploy that comprehensive policy file on each node that will run a pipeline process.   
     112 
     113The kinds of information that would be configured at the Pipeline level include: 
     114   * the stages to be run 
     115   * the topology of the pipeline: how stages are distributed across processors 
     116   * the Queue implentations to use; in particular, the Queue class that handles the input at the beginning of the pipeline and the output at the end.   
     117 
     118Queues would have policies associated with them that control how they pass the Clipboard data from the output side to the input side.  The Queue handling that start and end of the Pipeline in particular would require policy parameters that control: 
     119   * how to receive and instantiate data for placement on the !InputQueue of the first stage 
     120   * what data from the !OutputQueue to persist 
     121 
     122The individual Stages would each have policies associated with them.  These would mostly be specialized to the particular behavior of that Stage; however, some parameters could provide hints to the Pipeline about how the Stage is implemented (e.g. does it provide `preprocess()` and `postprocess()` implementations, will there be sharable data) which the Pipeline can use to optimize the execution of the Stage.   
     123 
    101124== Implementing a Stage == 
    102125 
     126Application-level classes that encode some algorithm are made available to a pipeline by wrapping them in a Stage implementation.  That is, the application developer subclasses the  Stage class to make a Stage that applies a particular algorithm.  For example, the developer may create a class called ImageSubtractionStage.  The developer must provide implementations of the virtual functions, include `preprocess()`, `process()`, and `postprocess()`.  (In DC2, many Stage implementations will not need to implement the serial steps).  The `process()` implementation encodes the logic for applying the algorithm to the input data retrieved from the Clipboard on the !InputQueue.  This could be a "thin" implementation that mainly wraps a single class that does the work or a "thicker" implementation that glues together several classes together to apply the algorithm.   
     127 
     128In DC2, Stages are implemented in Python. 
     129 
     130In practice, one of the first things the stage developer will need to do is determine what data it expects to find on it !InputQueue and what data it will provide on its !OutputQueue.  Each data item will be given a name according to the heuristics described [#PassingDataandInformation above].  The Stage documentation must include a full enumeration of these data items.   
     131 
     132The Stage developer will also need to determine what configuration information it will need via a Policy.  This will be documented by creating a Policy Dictionary file (see PolicyDesign).    The Stage developer implements the Stage's `configure(Policy)` method to pull data out of the Policy by the names defined in the Dictionary.  Some of those names may return to Policy objects that can be passed to instances of classes the Stage uses internally.   
     133 
     134=== Testing a Stage === 
     135 
     136== Supporting Different Pipeline Topologies ==