Airflow Workflow - casangi/RADPS GitHub Wiki
The example workflow is another implementation of the example RADPS Workflow Decomposition, composed of simple "workloads" (mostly sleeps) defined in DAGs, which are triggered by an overall DAG with TriggerDagRunOperator.
The example workflow decomposition is represented by a single parent DAG definition file: example_workflow.py. Versions of this parent DAG are registered with the DAG processor and MetaDatabase, corresponding to each version of the file tracked by the configuration of the airflow deployment.
The DAG definitions corresponding to each stage are referenced from this parent file. These consist of placeholder functions composed into a DAG using Airflow's taskflow API. In this simple example, the functions composed into the DAG are defined in the file, but they could also be imported from some other library or referenced to files tracked in another area of the repository.
The relationship between the stages of the example workflow are defined in the top level DAG, and the relationship between tasks in a stage are defined in each of those sub-files. The scheduling, trigger rules, assets, and other configuration properties are configurable at the level of each individual task.
There is an "Airflow Context" built-in to store Airflow orchestration metadata which is stored in Airflow's metadata database. This can be passed to every airflow task using **airflow_context. This contains:
- Execution tracking, i.e.: task status, execution time, etc run_ids and state (‘success’, ‘failure’, etc), retry counts
- Return values from xcoms (via task instances)
- Task dependencies in a dag (via DAG objects)
- Parameters/configuration used to run each task/dag/etc
This is needed to store domain-specific context.
For example:
- Calibration state
- Cached MS metadata
- Image lists that imaging stages should produce
- Synthesized beams
- Observing run metadata
- Imaging mode, Imaging parameters, clean list
- Image mitigation settings
- Virtual spws, spw mapping info
Following the approach of context domain object in the existing pipeline, a "context" functionality was introduced to the demo pipeline to represent saving state across tasks. Right now this just saves a single qa score for each stage. In the Airflow Workflow code, this is defined in plugins/pipeline_context.py and is interacted with via the load_pipeline_context(), and save_pipeline_context() functions.
The context is designed as a thin wrapper which saves/restores information from a database which stores information relevant to future stages. Each stage reads the relevant information out of the database to fetch the current state, and then saves its results to the context at the end of its execution:
-
setup_dbis called inimport_data_and_prepto initialize the context database -
load_pipeline_conextloads the current context -
save_pipeline_contextsaves an input dict and stage name to the context.
This was tested locally using a postgres database.
By definition, a DAG is acyclic and Airflow workflows are based on that as the core concept. Therefore, if a workflow requires sequential loops some cares have to be taken in design to ensure proper task dependencies and preserve execution order of the loop. We experimented with the solver process as done with Prefect to see if nested loops requires in the solver can be easily achievable using Airflow.
The figure below shows the solver DAG. The nested loop parameters or indices are pre-generated at generate_process_list
task and the list of the parameters is passed to execute_single_solver task, which is run concurrently using Airflow's dynamic task mapping feature. Each execute_single_solver contains an entire iteration loop for each parallelization indices, in this example, channels.
The cube imaging and per spw continuum imaging DAGs were made use of this Solver DAG by triggering with some arguments. The figure below shows the graph where the solver is triggered in image_target_cube using TriggerDagOperator. For the simplicity, here cube imaging of 10 spectral channels is assumed and so there are 10 concurrent execute_single_solver in the solver graph above.
- Set up Airflow. Recommendation: use the docker setup described in one of the airflow tutorials
-
Clone the repo and update the Airflow configuration file
airflow.cfgto use theairflow_workflow/dags/directory in the repo as itsdags_folder. - Enable the DAG in the Airflow UI, and it will run on the schedule defined in the definition file. Also, optionally re-parse and trigger the the DAG via the command line or UI.
To stop running dags of specific dag id from command line
- Check the dag is actually running
airflow dags list-runs --state running dag_id - Pause the dag to prevent from re-launching
airflow dags pause dag_id - Kill all running dags with the dag_id
airflow dags delete dag_id --yes
Both of the following errors in code will cause DAGs to fail to import (often silently)!
- Missing or circular imports
- Python syntax errors
If DAGs are not showing up in airflow or not updating, try the following:
- Check to see if dag is listed
airflow dags list | grep dag_name
- If using docker, check the actual file in the container to see if it's the one you expect:
docker-compose exec airflow scheduler cat /opt/airflow/dags/missing_dag.py
If it’s not there, see where /opt/airflow/dags is set up in the docker-compose.yaml
- To check if the DAG parsed correctly:
airflow dags list-import-errors
No data found means the DAGs parsed correctly and there weren’t any DAG parse errors
- Try running the DAG directly to check for errors:
docker-compose exec airflow-scheduler python /opt/airflow/dags/missing_dag.py
- Restart airflow scheduler to force it to re-read the DAG files:
docker-compose restart airflow-scheduler
-
If you modify a module which might significantly impact the functionality/performance of a DAG - it isn’t automatically reloaded like the DAGS. You may need to restart the airflow scheduler.
-
Airflow’s DAG versioning system doesn’t account for dependency changes - it can continue using silently using cached versions.
-
Airflow tasks need to be run in the orchestrator to work properly (xcom, airflow variables, connections, airflow context). So, the development / debug cycle is lengthened.
Instead of being able to directly locally make and test changes:
Make change -> run -> did it work? -> repeat
You need to run the DAG inside the orchestrator: Make change -> deploy -> kick off run or wait for it to trigger -> check logs -> did it work? -> repeat
- No breakpoint debugging capability.
There are documentation pages dedicated to creating, configuring, and managing deployments of Airflow using public helm charts:
- https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/production-deployment.html#helm-chart-for-kubernetes
- https://airflow.apache.org/docs/helm-chart/stable/index.html
The first step is installing the helm chart. This is straightforward using default values (note that the following commands presume prior configuration of the appropriate namespace/context):
helm repo add apache-airflow https://airflow.apache.org/
helm upgrade --install airflow apache-airflow/airflow
This installs all the components of an airflow deployment, which can be examined in the standard way to see all of the running pods and services created by the helm chart, with kubectl get pods && kubectl get svc:
NAME READY STATUS RESTARTS AGE
airflow-postgresql-0 1/1 Running 0 18m
airflow-redis-0 1/1 Running 0 7m28s
airflow-scheduler-687cbb9d6-bhxp4 3/3 Running 0 18m
airflow-statsd-75fdf4bc64-tdwcq 1/1 Running 0 18m
airflow-triggerer-0 3/3 Running 0 18m
airflow-webserver-6bb747bc64-btqcd 1/1 Running 0 18m
airflow-worker-0 3/3 Running 2 (8m3s ago) 18m
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
airflow-postgresql ClusterIP <redacted> <none> 5432/TCP 4m25s
airflow-postgresql-hl ClusterIP None <none> 5432/TCP 4m25s
airflow-redis ClusterIP <redacted> <none> 6379/TCP 4m25s
airflow-statsd ClusterIP <redacted> <none> 9125/UDP,9102/TCP 4m25s
airflow-triggerer ClusterIP None <none> 8794/TCP 4m25s
airflow-webserver ClusterIP <redacted> <none> 8080/TCP 4m25s
airflow-worker ClusterIP None <none> 8793/TCP 4m25s
Port-forwarding is the standard way that the UI is exposed (kubectl port-forward svc/airflow-webserver 8080:8080 &), but something like traefik could also be configured to serve as an ingress controller to the dashboard. Another way of exposing the airflow-webserver component to external requests is configuring a load balancer (kubectl expose deployment airflow-webserver --port 8282 --target-port 8080 --name=airflow-load-balancer --type=LoadBalancer)
Now the UI should be accessible at port 8282 of one of the external IP addresses listed by kubectl get svc.
The next step is to register the example workflow DAGs with the scheduler. There are a few different ways to do this:
-
You can use a simple cronjob or any other mechanism to sync dags and configs across your nodes, e.g., checkout dags from git repo every 5 minutes on all nodes (docs)
- building docker images with the DAG source files included in them (docs)
- using git-sync to accomplish the same thing as the cron job, but using the sidecar pattern to make DAG source files available to pods in the deployment (docs)
- using the Kubernetes concept of persistent volume claims to attach mount storage pre-configured to hold the DAGs (docs)
To use the git-sync sidecar method, the deployment can be updated with some configuration changes to set the appropriate key:value mappings defined in the official values.yaml. These should eventually be in source control along with the other chart modifications, but for now they can be passed using --set at the CLI:
helm upgrade --install airflow apache-airflow/airflow --namespace airflow \
--set dags.persistence.enabled=false \
--set dags.gitSync.enabled=true \
--set dags.gitSync.repo=https://github.com/casangi/RADPS.git \
--set dags.gitSync.branch=main \
--set dags.gitSync.subPath=airflow_workflow/dags/
Now DAGs from this repository should register with the scheduler and be available to run in the deployment. In practice, we have seen DAG import errors in all DAGs which contain from airflow.sdk import task, flow and this seems to be due to fundamental incompatibility between the helm chart (which is pending a release that will be the first to support Airflow 3.0), against which we developed our example pipeline.
Another method was pursued (baking DAGs into docker image), and the default image was extended, available publicly on dockerhub: amcnicho/radps-airflow. However, the modified helm install fails with timeout errors.
helm upgrade --install airflow apache-airflow/airflow --namespace airflow \
--set images.airflow.repository=amcnicho/radps-airflow \
--set images.airflow.tag=3.0.0 \
--set images.airflow.pullPolicy=Always \
--set airflowPodAnnotations.random=r$(uuidgen) \
--set images.migrationsWaitTimeout=1000
...
Error: failed post-install: 1 error occurred:
* timed out waiting for the condition
In the absence of an accepted release of the official airflow helm chart compatible with the 3.0 API, it should be possible to test the deployment using the latest version of the charts on the main branch of the airflow public repository. Doing so results in a working deployment except for the "minor detail" that authentication against the api-server is not properly configured out of the box. There have been breaking changes to the auth management functionality. By default, the latest version of airflow/chart/values.yaml on main sets config.core.auth_manager: airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager. The Flask AppBuilder auth manager is a provider and as the official docs say,
Switching to a different auth manager is a heavy operation and should be considered as such.
In the interest of just getting things working, we can override this default, and set config.core.auth_manager: airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager. The new problem is, the default account creation command (airflow users create ...) no longer works in Airflow 3 (see this doc update PR), so the create-user pod goes into a CrashLoopBackoff state and the user account creation does not propagate to the metadatabase.
- 3.0 release docs
- issue#49738 airflow 3.0 documentation update epic
-
issue#51590 "
airflow.sdk" public interface docs
- discussions
- milestones for 1.17 tracks pending 1.17 helm chart release
- PR#49923 "secrets not generated" (now fixed)
- issue#49896 list of webserver config deprecations in 3.0 UI
- issue#51923 webserver_config.py updates in helm chart
- issue#51813 testing of 1.17 helm chart release candidate
- milestones for 1.18 tracks 1.18 helm chart release
- issue#50994 "airflow helm chart not compatible with 3.x"
- issue#51304 "user creation fails"
The Airflow workflow can simulate failures. This mode can be activated by adjusting the simulate_failures and failure_rate DAG parameters. simulate_failures defaults to False. DAG parameters have default values set in the code, but can be specificed when triggering the DAG from the Airflow UI.

The number of retries, and the amount of time between retries can be set for an individual task, all tasks in a DAG, etc. When a task fails, and is in a state of ready to be run again, this is how it looks in the airflow UI:

- Some pipeline context capacity (beyond airflow's internal context)
- Unified QA score handling across all of the stage DAGs
- Interactive workflows / parameterizing DAG instances
- Error handling / break points
- Changing high level graph composition