Last modified 9 years ago Last modified on 01/30/2010 03:31:20 PM

TCT Proposal: Use of Condor/DAGMan in Pipeline Orchestration Layer


We propose to use an existing 3rd party package, Condor, to execute complex workflows associated with non-real-time processing (namely, the Data Release Production for DC3b). This package includes a metascheduler, DAGMan, which can carry out workflows described in terms of a directed acyclical graph (DAG).

Package Description

Web Site:
License: Apache, v2.0

Condor is a package for executing, monitoring, and receiveing output from a variety of jobs launched on remote machines. In particular, Condor will be aware of a whole set of machine--a Condor pool--which may be heterogeneous. The user can submit jobs to the Condor scheduler specifying not a specific machine to run on, but rather only the platform requirements. Condor will determine which machines matches the requirements and are available, and will launch jobs accordingly. When Condor is managing a large number of platforms to run a larger number of jobs, it excels at maximizing the utilization of those platforms.

DAGMan is a tool that comes with Condor describing a workflow: a set of jobs with dependencies. That is, the outputs from some jobs serve as inputs to others. The dependency relationships are described as a directed acyclical graph (DAG). The DAGMan scheduler will analyze the graph and execute the jobs in proper order.


The real-time processing requirements of the LSST Alert Production led to a harness design that minimizes I/O and process overheads. This design has two shortcomings. First the tight coupling of long-running processes makes it difficult to recover from various types of processing failures. The second shortcoming has to do with core utilization. The tightly coupled processes require synchronization in order to pass information between them (be this between slices of a pipeline communicating via MPI messages or between pipelines communicating via events). Because different processes will complete their work at different rates, the fastest processes can leave their core idle while waiting for data from other processes. Finally, the current harness implementation has an additional shortcoming: it cannnot easily process more slices than the available number of cores.

The Data Release Production is not subject to the real-time requirements; thus, by paying the penalties of additional I/O, we can adopt a workflow model that makes it easier to recover from failures. This also makes it easier to break up the processing in more purely data-parallel chunks which need not run simultaneously. Furthermore, the Data Release Production is much more complex in its processing in which the axis of parallelization is changing many times through the workflow.

When the processing can be broken up into uncoupled, data-parallel units that do not require any two of them to be running simultaneously, one can execute the overall workflow as independent jobs. In particular, it is easier to keep all cores busy when the number of jobs are much greater than the available cores. This is exactly the model provided by Condor. The DAGMan add-on tool handles the dependencies between jobs that allows data to flow through a complicated plan like what is needed by the Data Release Production. Furthermore, failed jobs can be more easily rescheduled in this model. Condor provides two mechanisms to deal with such failures: first, it can automatically resubmit individual failed jobs. Second, one can define so-called failure DAGs--workflows that are engaged when failure is detected.

Planned Use

Support for Condor will be provided as a plug-in to the orchestration layer ctrl_orca. The overall workflow will be described as a DAG. Individual jobs in the DAG will be LSST pipelines that use the pipeline harness (i.e. that is made up of stages). A pipeline will run in one of the following modes:

  • a single slice pipeline that will process one chunk of data per execution (i.e. single iteration)
  • a single slice pipeline that will process a predetermined number of data chunks per execution (i.e. multiple iterations)
  • a multi-slice MPI pipeline to process parallel chunks in parallel.