Airflow Tutorial Notes - casangi/RADPS GitHub Wiki

Recommended path through the documentation

  1. Airflow Quickstart
  2. Architecture overview
  3. Security model
  4. Tutorial pages
    1. Fundamentals
    2. TaskFlow API
    3. Pipelining

There are various ways to install Airflow and this pathway through the tutorial docs only exercises one of them (the pypi installation). It is explicitly noted that the Quick Start deployment is "just for inspiration" and not intended to form the basis of a production deployment. The various other installation methods are clearly presented along with their intended use cases and users, community support commitments, and expected competencies.

Key concepts

Core Concepts and architectural information about Airflow are presented in a dedicated section of the public docs.

The Airflow public docs are intentional about intended roles and audiences for categories of user interaction with the system. For example, the Security Model outlines the following roles:

  • Deployment managers
  • DAG authors
  • Authenticated UI users
    • Admin
    • Operations
    • Connection configurators
    • Audit log examiners
    • "Regular users"
    • Viewers

The components of the example workflows employed by the documentation are:

  • DAG file (workflow definition; composed of tasks, operators, and the dependencies between them)
  • DAG processor and associated metadata database
  • scheduler (task submission controller with an associated executor)
  • API server

DAGs (directed acyclic graphs)

DAGs are the fundamental unit representing a batch of of recurring work in Airflow. DAGs could feasibly implement Workflows from the Conceptual Architecture. They can also be bundled into collections that share conceptual or deployment concerns.

The schedule for a DAG is part of its definition.

Tasks

Tasks in Airflow are equivalent to Tasks in the Conceptual Architecture, and represent the granular pieces of work that compose to form a DAG. Tasks can be conditionally executed at runtime. Tasks have dependencies with other tasks, that provide constraints on runtime execution. They can be indicated in various ways: i.e task1 >> task2 >> task3.

Types of Task:

  • Operators - predefined task templates
  • Sensors - operators designed to wait for [something]
  • TaskFlow - custom-defined Python functions with a @task decorator

Executors run Task instances. Example: Kubernetes Executor

XComs (short for cross-communications) can be used to pass information between Tasks in a DAG run, including return values. They are designed for small amounts of information, and are stored by default in the Airflow database, but the backend can be swapped out to use an object store. Note that the TaskFlow decorators use XComs to pass values between tasks in a DAG under the hood.

Interesting note - Airflow allows the creation of a virtual environment for a single task run.

Concurrency

Concurrency is handled by the scheduler. Behavior can be modified via the global concurrency settings in the airflow configuration file, airflow.cfg or via per-DAG concurrency controls (max_active_tasks) or via task level concurrency controls ( max_active_tis_per_dag), or per-Pool settings to limit set the max number of concurrently running tasks.

Retries and Error-handling

  • Number of retries can be set per-task, i.e. @task(retries=3)
  • on_failure_callback can be used to set a function to call when the task fails
  • "Trigger rules" can be used to run tasks under specific circumstances, like if one of its upstream tasks has failed.

Scheduling

"The Airflow scheduler monitors all tasks and dags, then triggers the task instances once their dependencies are complete. Behind the scenes, the scheduler spins up a subprocess, which monitors and stays in sync with all dags in the specified DAG directory. Once per minute, by default, the scheduler collects DAG parsing results and checks whether any active tasks can be triggered."

In addition to scheduling runs based on time, Airflow can also schedule runs when an Asset is updated or based on an event.

Code and Debugging tips

  • Logs are available per-DAG and per-run under $AIRFLOW_HOME/airflow/logs, organized by DAG id.
  • There is no need to run python dag_name.py - as long as it's located in the folder Airflow expects to find DAGs in, airflow will load it on its own.
  • airflow dags list can be used to see if your DAG has been recognized by Airflow
  • airflow dags test [dag_id] will run task instances locally, output their logs to stdout, and not track state in the database.
  • airflow dags show [dag_id] will create a graph of the dag
  • airflow dags list-import-errors will list errors generated when airflow attempted to parse a dag

Things that can cause silent dag failures - no errors, but also no parsed dag to run, doesn't show up in airflow dags list etc:

  • Not actually calling the dag function
  • Calling the dag function inside a if __name__ == "__main__"
  • Accidentally overwriting a task function name, e.g. as a variable name.
  • Accidentally using the same dag_id as another dag defined in another file.

Questions and issues

  1. First few naive shutdowns of the standalone airflow process resulted in a relatively messy cleanup (multiple python and gunicorn processes left running, lots of log messages when DAG file parsing refreshes: {manager.py:523} INFO - Not time to refresh bundle)

  2. How best to ensure DAG definitions stay lightweight?

“[A DAG definition file] needs to evaluate quickly since the DAG File Processor checks it regularly for any changes.

Apart from conformance to the framework architecture and diligent internal review, are there mechanisms in place to encourage good hygiene when it comes to preventing heavily executed code from finding its way into DAG definitions?

  1. Can we standardize DAG definitions files without dragging around lots of boilerplate, or is each one its own domain?

Defining a set of default parameters for each task is usually more efficient and cleaner.

This seems like we're bound to passing around boilerplate or extending the BaseOperator as soon as we need to standardize task arguments. This seems like it might get messy quickly if when we start defining callback function lists.

  1. This error occasionally pops up in the UI, at various locations: 500 Internal Server Error Screenshot 2025-05-13 at 11 57 09 AM

  2. Updating an existing DAG (example_workflow.py) to replace a taskflow-decorated function by an instance of airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunOperator caused ephemeral error in the Task Instances pane of the UI:

422 Validation Error
Invalid value for state. Valid values are removed, scheduled, queued, running, success, restarting, failed, up_for_retry, up_for_reschedule, upstream_failed, skipped, deferred

This cleared after a while though.

Observations and Takeaways

  • Airflow has good, detailed documentation and many official and unofficial tutorials are available.
  • Emphasis on the framework as a "batch pipeline system"
  • Parameterization seems a bit clunky. If not using default_args templating, config must be expressed through DAG triggers as JSON, then handled inside any triggered sub-DAG (or taskflow)

Notes

  • A new major version of Airflow, 3.0, was released at the end of April. There are some breaking changes from previous versions of airflow documented here. This is useful to keep in mind when using Airflow resources produced before April 2025.
  • Airflow has a useful list of Best Practices
  • The ticket associated with the creation of this wiki is here: https://github.com/casangi/RADPS/issues/12

For the future

Consider merging your code into a repository with a Scheduler running against it, which will allow your DAG to be triggered and executed daily

Other pages from the how-to guides:

Progressing towards a production deployment, other references will be necessary to configure Airflow beyond the basic standalone pypi-installed instance with SQLite backend, default DAGs, and simple authentication.

Database configuration pages:

Kubernetes deployment documentation:

Third-party documentation: