Airflow simple DAG example - unix1998/technical_notes GitHub Wiki

To use the BashOperator instead of the DummyOperator, you need to import BashOperator from airflow.operators.bash_operator and then define the tasks using the BashOperator. Here's how you can modify the code examples to use BashOperator.

Using BashOperator in Combined DAG

combined_dag.py:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'combined_dag',
    default_args=default_args,
    description='A simple combined DAG using BashOperator',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 6, 15),
    catchup=False,
)

hello_world_task = BashOperator(
    task_id='hello_world_task',
    bash_command='echo "Hello World from Task 1"',
    dag=dag,
)

hello_world_task_2 = BashOperator(
    task_id='hello_world_task_2',
    bash_command='echo "Hello World from Task 2"',
    dag=dag,
)

hello_world_task >> hello_world_task_2

Using BashOperator with ExternalTaskSensor

dag1.py:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'dag1',
    default_args=default_args,
    description='First DAG using BashOperator',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 6, 15),
    catchup=False,
)

hello_world_task = BashOperator(
    task_id='hello_world_task',
    bash_command='echo "Hello World from DAG 1"',
    dag=dag,
)

dag2.py:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'dag2',
    default_args=default_args,
    description='Second DAG using BashOperator and ExternalTaskSensor',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 6, 15),
    catchup=False,
)

wait_for_task = ExternalTaskSensor(
    task_id='wait_for_task',
    external_dag_id='dag1',
    external_task_id='hello_world_task',
    mode='poke',
    dag=dag,
)

hello_world_task_2 = BashOperator(
    task_id='hello_world_task_2',
    bash_command='echo "Hello World from DAG 2"',
    dag=dag,
)

wait_for_task >> hello_world_task_2

Using BashOperator with SubDAGs

main_dag.py:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

def create_subdag(parent_dag_name, child_dag_name, args):
    dag_subdag = DAG(
        dag_id=f'{parent_dag_name}.{child_dag_name}',
        default_args=args,
        schedule_interval="@daily",
    )
    with dag_subdag:
        hello_world_task_2 = BashOperator(
            task_id='hello_world_task_2',
            bash_command='echo "Hello World from SubDAG Task 2"',
        )
    return dag_subdag

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'main_dag',
    default_args=default_args,
    description='A main DAG with a SubDAG using BashOperator',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 6, 15),
    catchup=False,
) as dag:
    hello_world_task = BashOperator(
        task_id='hello_world_task',
        bash_command='echo "Hello World from Main DAG Task 1"',
    )

    from airflow.operators.subdag_operator import SubDagOperator
    subdag_task = SubDagOperator(
        task_id='hello_world_subdag',
        subdag=create_subdag('main_dag', 'hello_world_subdag', default_args),
        dag=dag,
    )

    hello_world_task >> subdag_task

In these examples, the BashOperator is used to execute simple bash commands, replacing the DummyOperator which is typically used for demonstration purposes without actual execution logic. Make sure to adjust the bash_command parameter to fit the actual commands you want to run in your tasks.