Last modified 6 years ago Last modified on 10/14/2013 07:19:52 PM

Bosch's Refactoring Thoughts: Task Framework and Parallelization

Comments are welcome, but please add them in italics and prefix with your initials or username; insert them directly after the text you want to comment.

This page is about the Task framework (i.e. pipe_base), not the hierarchy of Tasks we current have (i.e. pipe_tasks, etc). I think the Task hierarchy also needs some work, but I don't plan to write a page on that; I think it just needs a careful audit for duplication and readability, and a refactoring that preserves basically all the code while just shifting some of it around. Most importantly, I think that work is almost entirely orthogonal to what I'm describing here.

I have some thoughts of maybe adding a straw-man design to this page at some point, but I'll put that off for later, as I don't want to go too far on this without getting some feedback.

Repo-Wide Configs and Schemas

Tickets #2723 and #2734 (which made Task configs and table schemas independent of data ID) had a much bigger impact on the overall framework than I anticipated, and they threw a bit of a wrench into parallelization. While the implementation was somewhat hurried, I'm still confident that this is behavior we want, but we need to deal with the ramifications of that change w.r.t. race conditions and reimplement at least some of it.

I think there are basically three options here:

  • We can make the config/schema writing and comparision code robust against race conditions at a low level. I think this requires more from the butler, and some tricky multiplatform locking code I don't personally feel that I'm qualified to write. In this model, we'd continue to write configs and schemas when the Task is first run within a repo, and compare them on all subsequent runs of that Task. When running in parallel, which process writes could then be random, and the low-level locking code would enforce the requirement that other processes not do any comparison until the writing is complete.
  • We can make explicit hooks for the config/schema write/compare code that allow the parallelization middleware to ensure it is not invoked in parallel. Later in this page, I'll argue that we'll want more parallelization hooks for other reasons. If one of these hooks can guarantee that the code within it is executed only once before the parallel code, this does everything we need: we can invoke the config/schema write/comparison within it, writing the config/schema once the first time a parallel job is launched, and comparing once each time the same Task is re-launched, regardless of how many jobs are run in parallel. This is the approach currently used to solve the problem in the "-j" multiprocessing and HSC parallelization approaches.
  • We can make configuration an entirely separate step from running the pipeline: in production mode, it would make sense to define the configuration for all Tasks in advance, when the output repo is first created. When the Tasks are run later, their configuration cannot be changed, and it's simply loaded from the configuration persisted in the repo when it was created. It also has the advantage of providing a point at which the configurations for all tasks could be validated together, which is currently a minor hole in the pipeline's validation abilities. I'm unsure as to whether this two-step approach could be made convenient enough to work well in development mode, in which the configuration needs to change frequently between runs (i.e. the current --clobber-config mode). But I'd be curious to see a more-fleshed out version of this approach.

In addition, we need a bit more functionality for this feature, regardless of how we resolve the issues with parallelization. It is frequently necessary for a Task to have access to the configuration-dependent schemas of previous Tasks to determine its own schema; a forced-measurement Task, for instance, may need the schema of its reference catalog in order to define its own. Having access to a previous Task's config may also provide a way to do cross-task config validation. Currently, this is impossible to do in a Task constructor as a Butler is not available at that point, and instead we have to define the schema on first use. Making a Butler available to the Task constructor would be a fine solution to this problem, if it doesn't raise problems with parallelization in other respects (I'm not sure if it does).


Each new command-line task currently requires at least two files - an importable Python module where the Task and config are defined, and a pure-boilerplate bin script that does nothing but import the task and call a method on it. Furthermore, the bin script is required to have a slightly different name from the Task class itself because of our naming conventions, and while experienced developers can easily translate from one form to the other, I believe this is an unnecessary distraction for new users; we don't want to have to educate them in how our particular flavor of camelCase naming works for them to be able to follow our code, or understand documentation that uses different names for the Task class and its bin script.

I would much prefer to have a Task-invocation script that accepts the full Python path of the task to execute, i.e.: lsst.pipe.tasks.ProcessImageTask <input> ...

even though this is longer than the current: <input> ...

We could also have multiple such invocation scripts:

  • Different scripts for different kinds of parallel execution (i.e. or; more on this below.
  • We could have scripts that print information about a Task instead of run it; I find a driver much more intuitive than the current "--show config,exit" interface.

Fine-Grained Parallelization

Our current parallelization framework only allows data parallel operations in units of Butler data IDs. It also ties the top-level Task classes to the parallelization axis, not the algorithmic content, because a top-level Task can have only one parallelization axis. I'm not sure all of the functionality I'll describe here is feasible, especially in the short term, as it would represent a significant expansion of the middleware functionality, but I'll describe it anyway.

I'll start by focusing on two parts of the pipeline that I feel are limited by the current parallelization scheme, in different ways:

  • In multifit, we spend so much time processing each Object that the development cycle is essentially days, even when processing a single patch, for certain kinds of development. To be sure, this is a much bigger problem in development than it will in production, because we're trying to do much more per Object and that's a big reason it's so much slower. This sort of development won't end anytime soon, however, and even in production I would guess that relying on naive patch-level parallization could result in a lot of idle time due to differences in the time spent processing different patches. The algorithm is still strictly data-parallel, but over Objects, which don't correspond to a Butler data ID. We'd also need to be able to do a small amount of processing prior to the parallel iteration over Objects.
  • Our coaddition process is currently split up into two Tasks (makeCoaddTempExp and assembleCoadd), both of which use patches as their parallelization axis. In both cases, we could be parallelizing at another level: makeCoaddTempExp mostly consists of a loop over visits, which is executed in serial, and assembleCoadd mostly consists of a loop over sub-patch sky regions, which is also executed in serial. When coadding a single patch with many visits, parallelization over these would dramatically speed things up.

Both of these use-cases are much more about development than production, and represent a desire to use a large amount of computing power to process a small piece of the sky very quickly. As such, it may be that we only need this sort of fine-grained parallelization implemented using a multiprocessing or multithreading approach that would be limited to a single node with many cores, rather than a large cluster with many nodes. But I'm concerned that making the multi-core parallelization API significantly different from the multi-node parallelization API could be just as bad as adding all the complexity this implies to the multi-node parallelization API. Or perhaps we should consider these two parallelization modes to always be nested, and somehow enshrine an approach like the one I believe the middleware people have used recently, using "-j" multiprocessing with large-scale production runs; we could add finer-grained parallelization at the multiprocessing level and only allow data ID parallelization through condor (etc.).

As an API, I'm envisioning some kind of "scatter-execute-gather" callbacks in a Task, which would then be called appropriately by the driver script (see Boilerplate, above):

  • The "scatter" method would be called once, and return a list of things to be parallelized over.
  • The "execute" method would be called in parallel, once for each element in the list returned by "scatter".
  • The "gather" method would be called once, with a list of all the return values from "execute".

There's obviously a lot of open questions here, starting from how the interprocess communication would be handled, and it would certainly add some new complications relating to the fact that each process would actually have its own Task instance; we'd have to be more vigilant about using Task attributes to pass data between methods, and about the serializability of function arguments. How this would play with subtasks is also a big open question, and while I think it'd be a big step forward even if this scatter-execute-gather approach was only available to the top-level Task, I'm not certain that's all we'll want.

It's worth noting that we effectively have a clunky "scatter-execute" (with no "gather") available in the current Task framework, which involves subclassing TaskRunner and putting the scatter code there. Because there's no "gather" step to do the output, however, we're still limited to parallelization over data IDs, because we need to be able to persist in the "execute" method. If we do have a chance to work on the Task framework and parallelization, I believe we should reconsider the TaskRunner design, and try to put as many of its hooks as possible in the Task class itself (probably as class methods for the most part).

Parallel Random Number Generation

I don't believe any of our current parallelization framework provides the necessary hooks to run algorithms that rely on pseudorandom numbers in parallel. I'm not really sure what the best approach for this would be, but at a minimum (at least for a simple approach) I believe it would be necessary for each Task instance to be given a distinct numeric ID that could be used when forming a seed for an RNG. I don't think it's necessary that a complete parallel run be exactly reproduceable in terms of the random numbers used. That's probably impossible if we want to have any kind of dynamic loading, unless we have a very fancy parallel RNG scheme. But it'd be nice if we could reproduce the work that was done by a single core in such a run based on the metadata and logs, even if that's a somewhat manual process.

Output Definitions

Another important issue for the Task framework that's unrelated to parallelization is the definition of output datasets. These are currently the responsibility of the Mapper, but I think this should be the responsibility of the Tasks, possibly using some predefined substitution strings defined by the Mapper. This is the only way I can think of to solve the current need for each Mapper to enumerate all outputs of all tasks, which I consider my #1 problem with the current stack. There's a bit more on this on this page.