Workflow Manager Technology Down Select for ALMA Demonstrator - casangi/RADPS GitHub Wiki

Technologies to Consider

For the ALMA Demonstrator, we are considering two workflow managers:

  • Airflow (stage, task)
  • Prefect (stage, task)

workflow

Selection Criteria

Benchmarking Results

Benchmarks were designed to evaluate the efficiency of scheduling capabilities by comparing Prefect, Airflow, and Dask. We included Dask because it is lightweight relative to Prefect and Airflow, providing a baseline for optimal performance. Additionally, we chose to use the Dask task runner for Prefect.

The benchmark consisted of n_tasks executed in parallel, each running a sleep function that paused for a duration drawn from a uniform distribution. After sleeping, each function returned a NumPy array of size data_size_mb. These arrays were then summed using a binary tree reduction. The parameters varied:

    list_sleep_times = [(1.0,0.1),(6.0,4.0)]
    n_threads_per_process = os.cpu_count() #16 cores
    list_n_tasks = [1000, 2000, 4000, 8000, 16000, 32000, 64000, 80000, 128000]
    data_size_mb_list = [0.1,10.0] #MB

The benchmarks were run on the Gravitas workstation, which has 16 cores and 64 GB of RAM. For the 10.0 MB return size and 128000 tasks, the scheduler must correctly perform the binary tree reduction; otherwise, it would run out of memory, since the total memory requirement in this case is 1.28 TB.

The percentage overhead was calculated using:

100*(t_workflow - t_sum_task_times/n_parallelism)/(t_sum_task_times/n_parallelism)

where

  • t_workflow is the time it took for the workflow to complete
  • t_sum_task_times is the total of all the sleep times
  • n_parallelism is the number of parallel executions happening (16 in this case).

The first plot shows the percentage overhead as the number of tasks varies, with a per-task return size of 0.1 MB. Airflow was only able to execute 1000 tasks due to a limitation in the XCom backend. A possible workaround is to use Redis; however, we were unable to get it to work. For Prefect, the percentage overhead remained around ~6%, while for Dask it was ~0.5%. Since we use Dask as the Prefect runner, Prefect introduces an additional ~5.5% overhead.

fig6_num_tasks_overhead

When the return size was increased to 10 MB, Prefect could only handle 8 000 tasks, most likely due to an interaction with its database.

fig5_num_tasks_overhead

Another issue we encountered is that although Prefect could complete 128000 tasks with a 0.1 MB return size, the dashboard was no longer real-time, as the database tracking task states took hours to update, even after the workflow had completed. In the prudent-walrus run below, the workflow shows as completed, but “0 Task runs” have been registered.

Ease of implementing "context like" functionality

The Pipeline context stores domain-specific context information across task boundaries and beyond workflow completion: For example:

  • Calibration state
  • Cached MS metadata
  • Synthesized beams
  • Imaging mode, imaging parameters, clean list
  • Image mitigation settings
  • Virtual spws, spw mapping info

Workflow-centric, non-domain "context" information such as task execution time, current stage of processing, and the parameters used for tasks is tracked by both Airflow and Prefect. Both store this information in databases which can be accessed via API.

Both frameworks are organized around a REST API (airflow API, prefect API), and both frameworks support interface to this API via some implementation of their own runtime context (airflow context, prefect context). This could enable future implementations of the Pipeline context that make direct use of the framework API to track and expose metadata about processing workflows.

For both Prefect and Airflow, the context-like functionality was modeled using a lightweight software layer to handle saving, updating, managing, and retrieving the context which was backed by a PostgreSQL database. While both platforms successfully handled this functionality, the Airflow implementation featured a simplified context design compared to Prefect, partly attributable to the developer experience challenges outlined in the "Developer Experience" section.

Workflow Error Handling and Retries

Both Airflow and Prefect have built-in functionality for automatically retrying failed tasks. The number of retries and other “tuning” parameters can be specified in each. Prefect has the ability to pause a workflow based on an error condition and wait for user input to proceed.

Airflow has an explicit concept of Service Level Agreements (per DAG), a measure of success against expected runtime parameters, integrated with automated alerting. However, this interface was deprecated as of the Airflow 3.0 release -- although a replacement is planned for 3.1. Prefect has a similar capability which is currently documented as an experimental feature only available in Prefect Cloud.

State types

Airflow Prefect
scheduled Scheduled
queued Pending
running Running
success Completed
restarting Retrying
failed Failed
up_for_retry AwaitingRetry
skipped
upstream_failed
up_for_reschedule
deferred
removed
none
Cached
Crashed
Cancelled
Cancelling
Paused
RolledBack

Interface

Both frameworks have web interfaces, including interactive dashboards via which one can view the current status of any running workflows, identify successes and failures, and view logs. Both frameworks have nice connected graph views of task relationships in the UI. Both frameworks allow for either command line or dashboard interface to operations such as registering jobs with the scheduler, spawning parameterized instances of workflows, and adjusting automation properties.

Prefect displays both events and artifacts as clickable “dots” which can be interacted with when the flow run is viewed. Prefect has a variety of different bar charts in the Task and Flow Run panels. There seemed to be a limitation of rendering more than a few thousand items in the dashboard for a single flow first encountered as a "load on demand" message, and then high latency and timeouts.

Airflow displays each DAG and also previous DAG versions. The top level dashboard also contains the health status of the Airflow components (i.e. metadata database, scheduler, triggerer, and dag processor). Airflow has a nice “version/completion” matrix to show how stages of a DAG have progressed over time.

Interactivity

The RADPS Memo 6 describes in the example workflow describes some checkpoints may involve human interaction to intervene the automated workflow to check status and prior outputs. User interaction in solver loops is also listed as additional items to consider in evaluating a prototype workflow in the memo.

Prefect provides functionality to trigger events, pause or suspend an automated flow, and wait for input. Interactivity in imaging stage flows (cube imaging, per_spw_cont imaging) via interaction of the Prefect UI was demonstrated. A simple set of iteration control parameters mimicking deconvolution process was controlled by pause_flow_run to interact through the Prefect UI. The pause_flow_run is primarily for interactions within Prefect (UI or other flows). While it was not tested during Sprints, Prefect also have send_input or receive_input which can be communicated with an external visualization package such as Bokeh via WebSockets or REST API calls.

Additionally, we demonstrated the ability to pause the workflow based on a condition being met and wait for user interaction to review whatever triggered the pause to decide whether to continue the run or now. This used a combination of pause_run_flow and Prefect's emit_event

In Airflow, we did not explore a similar type of the interactivity with Airflow since Airflow is primarily designed as an orchestration tool for batch-oriented workflows and tools to create human-in-the-loop processes are not available at the moment. However, such use cases have been requested from the user community and made into an Airflow Improvement Proposal(AIP) (see AIP-90). That particular proposal appears to be accepted so we expect to have such functionality in the future release of Airflow.

Developer experience

  • For both Prefect and Airflow, we evaluated them using the recent major releases (Prefect 3.x, Airflow 3.0.x), which had significant changes compared to the previous releases.

  • For Prefect, writing workflow code is similar to normal coding with Python. Turning python functions to Prefect flow or task is very simple with use of decorator. For Airflow, since a workflow is defined as a DAG, a greater care is needed to how to structure the code.

  • Longer develop-to-debug loop in Airflow: deployment is required to fully test DAGs due to airflow-specific functionality that can only be accessed when running DAGs via the Airflow runner. Additionally, it's not possible to run airflow DAGs using an interactive debugger the way one can with Prefect.

  • Silent, difficult-to-debug DAG parsing failures in Airflow

  • Airflow: DAG files need to be a single dag folder, which can be inconvenient during development/testing phases but it can be plus for deployment. Additionally, modules shared between DAGs are typically stored in a plugins/ directory which is not always pulled when DAGs are re-parsed which requires additional management.

  • Since Airflow uses XComs for cross-tasks communications and there is size limitation for what XCom can handle which depends on metadata database used. In our benchmark testing of airflow, we encountered 'unmappable_return_value_length' error relatively quickly. Also XComs require serialization of the data transfer between tasks. Therefore for certain data types such as a numpy array can introduce overhead for serialization and deserialization.

  • Both frameworks supply deployments that allow for local and distributed scheduling servers. Prefect allows configurable profiles via simple dot files and environment variables, e.g., to connect with different scheduler addresses for local and remote development.

Decision

After evaluating all criteria, we recommend adopting Prefect as the primary workflow manager for the ALMA Demonstrator, handling both stage and task level orchestration while leveraging Dask for fine‐grained parallelism. Prefect’s intuitive, Python-native API, robust context and state management (including pause/resume and retries), and rich REST driven UI meet our pipeline’s complexity needs, and by using Dask for fine-grained parallelism, we retain minimal runtime overhead and scalable, memory-efficient execution for tens of thousands of parallel tasks.