wiki:PersistenceIoStages
Last modified 9 years ago Last modified on 04/22/2010 04:27:58 PM

I/O Stages for Persistence

from: DC2 Management -> Persistence Framework .

Pipeline stages should expect their main data inputs to be present on their input queues, and they should place their output data on their output queues. This means that the persistence framework must be invoked by the pipeline framework in order to retrieve the proper input data and persist the correct output data. The mechanism for doing this is a specialized set of stages: an InputStage and an OutputStage.

These I/O Stages are defined in the Python module lsst.pex.harness.IOStage (see source).

Common Behavior

I/O Stages are configured by a policy, just like all other Stages. This policy is validated against the stage's dictionary. Several of the parameters that can be specified by this policy are common between the stages, and several other stage behaviors are also common.

runMode parameter

The "parameters.runMode" string can have values "preprocess" or "postprocess". This string controls when the input or output occurs for serial processing versions of the I/O stages. Parallel processing versions can only have I/O execute during the "process" phase of stage execution.

butler parameter

If a data butler is to be used to retrieve or persist data, it may be configured with a sub-policy specified by "parameters.butler".

jobIdentity inputKey

The "inputKeys.jobIdentity" string defines the clipboard key under which is found the job identity Python dictionary produced by the JobOffice component of the orchestration layer.

persistence parameter

A Policy may be passed to the persistence framework instance used by the I/O Stage by including it as a sub-policy under the key "parameters.persistence". See PersistenceHowTo for more details on how this policy may be used to configure Persistence, Storages, and Formatters. If no Persistence policy is given, an empty one will be used.

additionalData parameter

The mechanism used by the persistence framework for passing execution context information to formatters in order to enable generation of things like database table names or column values is the additionalData parameter to persist() and retrieve(). This parameter is a PropertySet.

Four keys are predefined in this PropertySet and are set by the I/O Stage: sliceId, the MPI rank of the slice process; universeSize, the total number of MPI processes; and runId, the string identifying the run; and itemName, the name on the clipboard of the item being persisted or retrieved.

Other keys may be set with values extracted from the stage's input queue's clipboard. To do this, the policy key "parameters.additionalData" is defined as an array of strings. Each string has the form "additionalData-key=clipboard-key". The clipboard-key is used to retrieve an arbitrary value from the clipboard. If the clipboard-key contains a dot ('.'), the part preceding the dot is used as the name of a PropertySet on the clipboard; the part after the dot is used as a key within that PropertySet. This is typically used to retrieve values from within events on the clipboard, where the event topic is the name on the clipboard. The additionalData-key is the name that the value will have within the additionalData PropertySet parameter.

In particular, formatters may expect the visitId key to be set, so that key should typically be specified in additionalData. Nevertheless, additionalData need not be specified.

If the clipboard-key is not present on the clipboard, an exception will be raised.

storagePolicy parameter

This is the "traditional" DC2/DC3a way of specifying data locations. It may continue to be used, but it is intended that the data butler will be the preferred mechanism in the future.

I/O Stages are used to persist or retrieve certain named clipboard items. These items are named in the "parameters.inputItems" or "parameters.outputItems" sub-policies. For each such item, a "!storagePolicy" key may be specified. This key contains an array of sub-policies, each of which contains two keys: "storage" and "location". "storage" specifies the name of a Storage subclass as provided by the persistence framework. "BoostStorage", "DbStorage", "DbTsvStorage", and "FitsStorage" are expected to be used most often. "location" specifies a storage-specific string to be provided as a LogicalLocation. The formats of these strings are documented in PersistenceHowTo#Storage; they are typically either a pathname or a database connection string. The InputStage or OutputStage will use each storage and location pair in turn to retrieve or persist data.

The I/O Stages provide an additional mechanism for incorporating execution context information into the persistence framework. String, integer, and float values from the additionalData PropertySet described above may be substituted into the "location" string. The syntax used is "%[format](additionalData-key)". The optional format is one appropriate for the Boost format library. Note that this syntax is similar to but not identical to the Python string substitution syntax.

For example, a !storagePolicy might contain the following:

parameters: {
   outputKeys: {
      diffIm: {
         storagePolicy: {
            storage: "FitsStorage"
            location: "/lsst/images/%(visitId)/difference1.%03d(sliceId)"
}

In addition to keys from additionalData, substitutions may also occur from a fixed list of locations provided to the LogicalLocation class via the static method setLocationMap(PropertySet::Ptr map). The location map substitutions are intended to be set by the orchestration layer. They take precedence over any additionalData substitutions.

If the specified additionalData-key has not been set in either the additionalData or the location map, or if it is not one of the supported types, an exception will be raised.

InputStage

The InputStage is used to retrieve data for use by subsequent Stages. The data to be retrieved is specified by a "parameters.inputItems" sub-policy within the Stage's policy. The inputItems policy in turn contains a single sub-policy for each dataset, with the name of the sub-policy being the key with which the dataset will be associated in the InputStage's output queue's clipboard.

There are three ways of specifying the sub-policy for each dataset.

  1. The traditional method using type, pythonType, and storagePolicy.
  2. An enhancement of the traditional method using multiple location substitutions provided by the orchestration layer.
  3. The data butler method.

Traditional Method

The dataset policy contains three keys: "type", which gives the name of the Persistable subclass to be retrieved (as registered by that subclass's Formatter); "pythonType", which is the fully-qualified Python name for the type of the object to be retrieved; and "storagePolicy", which is described above.

Enhanced Method

In addition to the "type" and "pythonType" keys, the dataset policy also contains "datasetType", "storage", and "location". "storage" and "location" are the same as for the traditional method's "storagePolicy", but there can only be one pair. "datasetType" specifies the type of dataset to be retrieved (such as "raw", "bias", "postISR", "visitim", "calexp", etc.). A Python list is retrieved from the clipboard under the key named in "inputKeys.inputDatasets" and then searched for dataset specifiers of the given type. This list is normally placed on the clipboard by the JobOffice component of the orchestration layer. Any matching dataset specifiers are used to create additionalData PropertySets which can then be substituted into the provided "location" string. If multiple dataset specifiers match the given type, all of the resulting retrieved objects will be returned in a single Python list.

Butler Method

The "type" and "pythonType" keys are not needed. Instead, a "datasetType" key and either "fromInputDatasets" or "fromJobIdentity" key are provided. The "datasetType" is the same as for the enhanced method above. If the "fromInputDatasets" key is specified with a "true" value, the "inputKeys.inputDatasets" is searched as above and the resulting specifiers are used with the butler to retrieve the dataset(s). If the "fromJobIdentity" key is specified with an array of strings, the values with those names are retrieved from the "inputKeys.jobIdentity" dictionary on the clipboard and used with the butler to retrieve the dataset(s).

Note that the data butler may also perform standardization of the input dataset. The other two methods cannot perform this standardization step.

A sample data butler method InputStage policy looks like:

inputKeys: {
    inputDatasets: inputDatasets
    jobIdentity: jobIdentity
}
parameters: {
    butler: {
        mapperName: lsst.obs.lsstSim.LsstSimMapper
        mapperPolicy: {
            root: %(input)
        }
    }
    inputItems: {
        Exposure: {
            datasetType: raw
            datasetId: {
                fromInputDatasets: true
            }
        }
        BiasExposure: {
            datasetType: bias
            datasetId: {
                fromJobIdentity: "visit" "snap" "raft" "sensor" "channel"
            }
        }
    }
}

OutputStage

The OutputStage is used to persist data after execution of a Stage. The data to be persisted is specified by a "parameters.outputItems" sub-policy within the Stage's policy. The outputItems policy in turn contains a single sub-policy for each dataset, with the name of the sub-policy being the key with which the dataset will be associated in the clipboard.

There are three ways of specifying the sub-policy for each dataset.

  1. The traditional method using storagePolicy.
  2. An enhancement of the traditional method that communicates with the JobOffice via the "outputDatasets" list on the clipboard.
  3. The data butler method.

Traditional Method

The dataset policy contains up to two keys: the optional key "required", which, if present, is a boolean specifying whether the dataset must be present and "storagePolicy", which is described above. Note that the "type" and "pythonType" do not need to be specified on output.

If required is specified for a dataset with the value "true", then the absence of that dataset on the OutputStage's input queue's clipboard will cause an exception to be raised.

Enhanced Method

The JobOffice needs to know which datasets have been successfully computed and persisted. This communication occurs through a Python list on the clipboard, the key for which is specified by "outputKeys.outputDatsets". The default value for this key is just "outputDatasets".

To specify the output dataset, a "datasetId" sub-policy can be added to the dataset policy. This sub-policy can contain three keys: "datasetType", which indicates the type of the dataset being output (e.g. "postISR", "visitim", "calexp", etc.); optionally "set", which contains a sub-policy of key/value pairs to be set in the dataset identifier; and optionally "fromClipboard", which contains an array of strings. The "fromClipboard" strings are used as keys into a Python dictionary on the clipboard, the key for which is specified by "inputKeys.jobIdentity" (default value "jobIdentity"). Each value in the dictionary is copied into the dataset identifier under the same key. As mentioned above, the final dataset identifier resulting from the "set" and "fromClipboard" manipulations is appended to the list on the clipboard under the key in "outputKeys.outputDatasets".

If "datasetId" is specified, all of the keys used to populate the dataset id are also added to the additionalData as a convenience.

A sample OutputStage policy using the "datasetId" might look like:

inputKeys: {
    jobIdentity: "jobIdentity"
}
outputKeys: {
    outputDatasets: "outputDatasets"
}
parameters: {
    outputItems: {
        ps: {
            required: true
            datasetId: {
                datasetType: postISR
                fromClipboard: "visit" "ccd" "amp"
                set: {
                    field: deep
                }
            }
            storagePolicy: {
                storage: BoostStorage
                location: %(output)/PS-%(field)-v%(visit)-c%(ccd)-a%(amp).boost
            }
        }
    }
}

Butler Method

The "storagePolicy" sub-policy is not needed. Instead, the dataset id as constructed according to the "datasetId" sub-policy is used by the data butler to determine where and how to persist the dataset.

A sample OutputStage policy using the butler method might look like:

parameters: {
    butler: {
        mapperName: lsst.obs.lsstSim.LsstSimMapper
        mapperPolicy: {
            root: %(input)
        }
    }
    outputItems: {
        ps: {
            required: true
            datasetId: {
                datasetType: postISR
                fromClipboard: "visit" "ccd" "amp"
                set: {
                    field: deep
                }
            }
        }
    }
}