Last modified 12 years ago Last modified on 08/02/2007 06:12:33 PM

Integrating the Association Pipeline with the Pipeline Framework

This page documents the design options for integrating the Association Pipeline? with the Pipeline Framework.

Pipeline Framework Description

A Pipeline consists of parallel Slices and a master process. Each Pipeline has an initialize() method and a shutdown() method that execute once in the master process.

Each Slice processes a portion of a field of view (FOV) by executing a sequential series of Stages. Each Slice has a single process and address space for all Stages.

The master process controls the execution of the Slices:

  • It waits for an event
  • For each Stage:
    • It executes the Stage's preprocess() method
    • It tells each Slice to execute the Stage's process() method
    • It waits for all Slices to finish
    • It executes the Stage's postprocess() method
  • It waits for the next event

Each Stage has an initialize() method and a shutdown() method that execute once in each Slice with no defined order.

The Pipelines, Slices, and Stages are long-lived, created once per night.

Ideally, each Slice should be able to run on an arbitrary machine. This is the case if a database is used to execute the Association Pipeline cross-match. If we do the cross-match in application code, however, we may want to simplify things by sharing memory between Slices. This can be accommodated by arranging for all Slices to execute on the same machine. This could even be the same machine as the master process.

Currently each Pipeline execution runs the Stages in all Slices, and there is a fixed assignment of portions of the FOV to Slices.

Any Stage may send an event. Presumably this would be best handled in the postprocess() method, so that only one event is sent to another Pipeline.

Association Pipeline Components

The Association Pipeline has (at least) three phases of execution, three inputs, two outputs, and the (possibly unique) characteristic that execution of the three phases may require more time than the interval between inputs. This means that multiple instances of the phases may have to be executing at the same time.



Phase inputs/triggered by
This phase is triggered by an event coming from the telescope control system. As input, the following information is required:
  • an integer ID for the FOV/visit
  • the expected time at which the FOV/visit will occur
  • FOV center ra
  • FOV center dec
Stage 1
Load Objects and historical DIASources for the portion of the FOV assigned to this slice into memory. Loading can be [partially] skipped if it is determined that the necessary data is already in memory.
Stage 2 [barrier after Stage 1 is required]
Build zone index for the Objects/historical DIASources in the portion of the FOV assigned to this slice. Note the zone index will require some data from the slice(s) that own the Objects/DIASources spatially adjacent those owned by this slice. Then, build a hashtable for all Objects/historical DIASources (keyed by objectId/diaSourceId).


Phase inputs/triggered by
This phase is triggered by an event coming from the Image Processing & Detection Pipeline that notifies the AP the DIASources for a FOV are ready. The following information is expected:
  • FOV id for the event so any events not relevant to the FOV being worked on can be filtered out (may not be necessary if pipeline framework can deal with this for us)
  • location of newly detected DIASources (1 or more names of database tables or files)
  • desired match distance between new DIASources and Objects, based on seeing conditions (we can hardcode this for DC2 if necessary)
Stage 1
Load newly detected DIASources into memory, and build a spatial index for them. Execute a distance based cross-match between newly detected DIASources and Objects. Flag all newly detected DIASources that did not match a known variable Object.
Barrier, wait for event from NightMOPS
Before moving on to the next stage, wait for an event from NightMOPS signalling that it has finished predicting mobing object positions/magnitudes for the FOV. The following information is expected:
  • FOV id for the event
  • location of predicted moving object positions (1 or more names of database tables or files)

NB Stage 1 is quite fast, so we could move the wait for this event to the beginning of the phase (before stage 1) without losing much time.

Stage 2
Load predicted moving object positions into memory. Cross match error ellipses of predicted movers to new DIASources that didn't match a known variable Object - any conclusively matching DIASources are removed from further consideration for alerting.
Stage 3 [barrier after Stage 2 may be required]
Create packets of data to send off to alert generation pipeline. Each packet will consist of at least: 1 newly detected DIASource, all matching Objects, and all historical DIASources associated with the matching Objects. If alert generation wants more, we could additionally include e.g. all Objects/historical DIASources in the neighbourhood of the newly detected DIASource. Using the data from the cross-matches/alert generation packets, Objects are updated and new ones are created (any newly detected DIASource that did not match any Objects or predicted mover is inserted into the in-memory Object catalog). All new DIASources are inserted into the in-memory historical DIASource catalog and the historical DIASource catalog is trimmed of old DIASources (for any given Object, a maximum of 10 historical DIASources are kept for examination by AG).

NB - post DC2, it is possible that Alert Generation will run within this phase. If for example AG wants to update Objects, it may be convenient to embed AG into the AP. Otherwise, the results will be passed to the Alert Generation Pipeline (an event with the location of the alert generation data packets will be issued).


This writes the updated Object and historical DIASource data back to disk.


  • Event from the telescope control system indicating the FOV to be observed next.
  • List of DIASources within the FOV from the Detection Pipeline.
  • List of predicted moving object positions/magnitudes from the Moving Object Pipeline.


  • Cross-match results for the Alert Generation Pipeline.
  • Updated Objects and DIASources (for the DQA system).

Association Pipeline Option 1: in-application, 1 pipeline per FOV

The empty boxes represent stages -- a slice corrsponds to one vertical column of boxes.

Association Pipeline Option 2: in-application, 3 pipelines per FOV

The main advantage of this approach is that each phase can parallelized at a different granularity and that things can be organized so that events are only needed/issued at the beginning/end of pipelines (rather than stages). For DC2 in particular, this approach would make the AP implementation easier since the Compute phase could be implemented on a single slice and wouldn't have to deal with communicating intermediate results across slices. For this same reason, so long as Compute can/will run on a single machine, it makes a lot more sense to implement AP parallelization with threads rather than processes (slices). One could perhaps imagine a mode of the pipeline framework where slices are threads rather than processes ...

Association Pipeline Diagram 2

Note that less stages are required. This is mainly because there are less places where data needs to be synchronized among slices, and also some functionality moves (index building would probably happen in Compute rather than Load).

Association Pipeline Option 3: in-dbms

If we run the association pipeline in the DBMS, we still have pretty much the same options as above. The only difference is that the various slices will be super light weight, each simply issuing independent queries to a MySQL database for pretty much everything (each client connection is assigned 1 MySQL server thread).

Design Options

A brief description of the design options (in terms of parallelization/integration into the pipeline framework) follow.

Multiple Pipelines [Selected For DC 2]

We could instantiate multiple Pipelines, up to the number expected to be simultaneously executing. The Pipeline Framework would have to direct each event from the telescope control system to an idle Pipeline. In addition, and with more difficulty, the framework would have to direct the Detection Pipeline output to the same instance of the Association Pipeline.

This option violates an assumption that Jeff believed is in the design of the pipeline framework, namely that all parallelism is handled by Slices.

M*N Slices

We could instantiate one Pipeline with M*N Slices, where M is the number of simultaneous executions and N is the degree of parallelism desired. In this case, the Pipeline control code would have to be modified in two ways:

  • It would have to be able to wait for events even while executing Stages. This might require it to create a new thread of execution for each triggering event.
  • It would have to direct a given FOV to a subset of the Slices instead of all of them.

N Slices

We could instantiate one normal Pipeline if multiple threads of execution can be maintained simultaneously. The triggering event would spawn a thread not only in the master process, which would then wait for more events, but also in each Slice. While it may seem that modifications to the Slice code could be avoided by having the Stage process() methods spawn a thread each time they are called, this fails because separate synchronization barriers are needed for each master thread. In this case, no FOV partitioning changes would need to be made.

This seems like the preferred option.