Airflow user - zhongjiajie/zhongjiajie.github.com GitHub Wiki

Airflow-user

You need to take care of when you use Apache Airflow in daily work.

依赖关系

根据committer成员的讨论,更加推荐使用位操作符(bitwise operators)来解决依赖

  • 最原始的方式task.set_upstream(task1); task.set_downstream(task2)

  • 位操作(bitwise operators)方式,airflow1.8之后task >> task1; task << task1; task >> task1 << task2

    • DAG的位操作符号可以提供更多的功能: t1 >> [t2, t3] >> t4已经能被支持了
  • 使用chain方法实现多个task的依次依赖(Airflow 1.10.3已经取消chain方法)

    • chain的一般使用

      from airflow.utils.helpers import chain
      chain(task, task1, task2)
    • 通过列表解析直接生成task列表然后chain起来

      from airflow.utils.helpers import chain
      ds_true = [DummyOperator(task_id='true_' + str(i), dag=dag) for i in [1, 2]]
      chain(cond_true, *ds_true)
  • 一对多的链接关系

    • t1 >> [t2, t3](推荐)
    • group = [task1, task2, task3]; task.set_downstream(group);
  • 多对一的链接关系

    • [t1, t2] >> t3(推荐)
    • group = [task1, task2, task3]; task.set_upstream(group)
  • 多对多的笛卡尔积

    • airflow.utils.helper.cross_downstream([t1, t2, t3], [t4, t5, t6])(推荐)

    • 使用自定义的方法

      import itertools
      import airflow.utils.helper.chain
      group_a=[task1, task2, task3]
      group_b=[task4, task5]
      for pair in itertools.product(group_a, group_b):
          chain(*pair)
  • 如果有根据一定条件选择下游执行哪个task操作的逻辑,可以使用BranchPythonOperator算子,使用是可以通过TriggerRule.ONE_SUCCESS设置实现.例如例子A是BranchPythonOperator,一个分支运行B,另一个分支运行C,同时B->C,这时可以在C中设置TriggerRule.ONE_SUCCESS.以前我总认为一个该这样实现,会多个两个算子

时间相关

  • 使用Airflow内置的日期宏
    • airflow内置了部分时间相关的参数,如'{{ ds }}'代表运行的时间,'{{ yesterday_ds }}'运行时间昨天的日期,更多时间相关的参数见这里
  • 自定义时间参数
    • 简单的时间操作: 通过replace完成,获取运行日期同时改变成特殊的时间some_command.sh {{ execution_date.replace(day=1) }}
    • 通过macros对时间进行更多操作: macros.ds_add将内置时间进行计算'{{ macros.ds_add(ds, 1) }}'

资源限制

作为一个工作流工具,除了完成各种复杂的上下游关系外,我认为解决资源的限制也是很重要的点.资源限制包括限制DAG的并发,限制多个DAG的运行关系.限制同一个/同一类Task的并发

DAG的限制

  • 限制dag并行实例数量
    • airflow.cfg[core]设置dag_concurrency限制并行数量

    • 在DAG文件中限制

      from airflow import DAG
      
      default_args = {
          'owner': 'airflow',
          # here to set value
          'concurrency': 10
      }
      
      dag = DAG(
          'tutorial',
          default_args=default_args,
          description='A simple tutorial DAG',
          schedule_interval=timedelta(days=1),
      )

Task的限制

  • 限制task的并行数量: operetor中的参数task_concurrency可以设置task的并行数量
  • 限制多个不同类型的task并行数量: operetor中的参数pool限制一类task的并行数量,与task_concurrency参数的区别是task_concurrency设置的是同一个task的并行数task_id要相同,pool设置的是一类task的并行数task_id可以不同,只要保证pool参数的名称相同就可以.设置后并行的task不会超过pool对象的slots

将Airflow中的对象通过DAG或者脚本的方式进行保存

connection variables pool

目前Airflow创建connection variables pool能通过如下方式创建:

  • Airflow的cli命令创建,对应命令分别为: airflow connections --add airflow variables -s airflow pools -s
  • Airflow的web UI页面进行设置,分别为: Admin -> connections Admin -> variables Admin -> pools

这里提供一个将connection variables pool固定到DAG的方法,查看这里.主要是之前使用docker-airflow每次重启时都会清空postgre数据,这样能保证connection variables pool能被git进行版本管理,下面以connections的创建为例子,进行说明

# conf.py
var = {
    'connections': [
        {
            'conn_id': 'ssh_my_own_1',
            'conn_type': 'ssh',
            'host': '127.0.0.2',
            'port': 22,
            'login': 'root',
            'password': 'pwd',
        },
        {
            'conn_id': 'ssh_my_own_2',
            'conn_type': 'ssh',
            'host': '127.0.0.3',
            'port': 22,
            'login': 'root',
            'password': 'pwd',
        },
        ...
    ]
}
# init_conn_var.py
from airflow import DAG, Connection
from airflow.setting import Session
from airflow.operators.python_operator import PythonOperator

def crt_airflow_conn(conf):
    conn = Connection()
    conn.conn_id = conf.get('conn_id')
    conn.conn_type = conf.get('conn_type')
    conn.host = conf.get('host')
    conn.port = conf.get('port')
    conn.login = conf.get('login')
    conn.password = conf.get('password')
    conn.schema = conf.get('schema')
    conn.extra = conf.get('extra')

    session = Session()
    try:
        exists_conn = session.query(Connection.conn_id == conn.conn_id).one()
    except exc.NoResultFound:
        logging.info('connection not exists, will create it.')
    else:
        logging.info('connection exists, will delete it before create.')
        session.delete(exists_conn)
    finally:
        session.add(conn)
        session.commit()
    session.close()

dag = DAG(
    dag_id='create_conn',
    schedule_interval='@once',
)

for connection in conf.get('connection'):
    crt_conn = PythonOperator(
        task_id='create_conn_{}'.format(connection.get('conn_id')),
        pyhton_callable=crt_airflow_conn,
        op_kwargs={'conf': connection},
        provide_context=False,
        dag=dag,
    )

用户创建

  • 通过Airflow UI页面进行创建Airflow -> Admin -> User,目前创建补支持密码

  • 使用cli命令行进行用户创建airflow create_user -r <ROLE> -u <USERNAME> -e <EMAIL> -p <PASSWORD>

  • 通过自定义脚本实现,将如下脚本放到AIRFLOW_HOME中,当需要创建用户的时候可以运行脚本进行交互式的创建

    import getpass
    
    import airflow
    from airflow import models, settings
    from airflow.contrib.auth.backends.password_auth import PasswordUser
    
    IS_CORRECT = "Y"
    HINT_THIS_SCRIPT = "\nhint!!\n==> YOU RUN THIS SCRIPT TO CREATE AIRFLOW USER NOW\n"
    HINT_USER = "Please enter username you want to create: "
    HINT_EMAIL_WITH_USER = "Please enter email for user `{username}`: "
    HINT_PASSWORD_WITH_USER = "Please enter password for user `{username}`: "
    HINT_CONFIRM_USER_PASSWORD = "\nhint!! > you want to add user `{username}` with email `{email}`\n" \
        "enter 'Y/y' to confirm the information\nor enter other key to reinput information\n>> "
    
    user = PasswordUser(models.User())
    
    while True:
        print(HINT_THIS_SCRIPT)
        user.username = input(HINT_USER)
        user.email = input(HINT_EMAIL_WITH_USER.format(username=user.username))
        user.password = getpass.getpass(HINT_PASSWORD_WITH_USER.format(username=user.username))
        correct = input(HINT_CONFIRM_USER_PASSWORD.format(username=user.username, email=user.email))
    
        if correct.strip().upper() == IS_CORRECT:
            break
    
    session = settings.Session()
    session.add(user)
    session.commit()
    session.close()
  • 从master中编译或者使用breeze运行时,这样创建用户:airflow users create --role Admin --username admin --email [email protected] --firstname jiajie --lastname zhong --password admin

DAG开发流程

调试

  • airflow创建及调试的顺序
    • 上传/更新DAG文件
    • 检测语法有没有错误python <文件名>
    • 使用airflow test运行单个taskairflow test DAG_ID TASK_ID execute_date

Airflow operator

这里记录几个Airflow里面常用但是比较难理解的operator

  • BranchPythonOperator: 通过不同的情况运行对应的下游task.通过python_callable参数的返回值确定下游要运行的task,返回值的名称就是要运行task的task_id
  • BashOperator: 运行bash命令,一般建议使用set -e开头,以防有bash命令失败但是状态还是成功,SshOperator的command同样的道理

docker-airflow

update at 2019-04-05: 已经在我的仓库中创建了zhongjiajie/docker-airflow定期将puckel/docker-airflow中优秀的PR合并到master,上面还有一套我自己使用的环境branch-custom

较常用的镜像是puckel/docker-airflow,这个镜像维护人的热度不高,且airflow官方的docker进行进行,后期可能会不使用这个版本.下面说明他可能存在的问题

  • 将数据库从postgresql切换到mysql:按照airflow官网的方式直接增加AIRFLOW__CORE__SQL_ALCHEMY_CONN变量没有效果,因为这个repo的scripts/entrypoint.sh有一句AIRFLOW__CORE__SQL_ALCHEMY_CONN="postgresql+psycopg2://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB"指定了数据库的类型和链接信息,即使你在docker-compose指定了AIRFLOW__CORE__SQL_ALCHEMY_CONN也会被scripts/entrypoint.sh覆盖掉,目前比较可取的方法灵感来自是这个issue的答案,将AIRFLOW__CORE__SQL_ALCHEMY_CONN="postgresql+psycopg2://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB"改成: "${AIRFLOW__CORE__SQL_ALCHEMY_CONN:="postgresql+psycopg2://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/$POSTGRES_DB"}"

用户实例

FAQ

  • 自定义operator后发现不能运行但是代码没有问题,有可能是自定义operator中的参数和Airflow内部变量的参数同名,如同ariflow date error中的原因,就是因为子自定义的Operator中定义了一个start_date变量,并把变量声明成template_fields导致的错误
  • airflow schedule_interval设置了@once之后dag一直hung,是因为airflow.cfg中的catchup_by_default设为了True,或者DAG默认参数设为了True,解决上面的问题,只要设置会正确值重启就可.has usage of @once for scheduler interval changed in v1.9

如何减少task之间的运行间隙

分布式任务如何运行

手动触发了任务之后是否会影响调度任务

BashOperator调用shell模板渲染失败

jinjia模板解析问题,调用shell脚本的时候,需要在*.sh后补一个空格,举例:

task_id='01_onions_first_ready_collecion',
bash_command='/bin/bash /home/test.sh ', #注意这后面有个空格
dag=dag

渲染模板同时有变量和模板

渲染模板中(如BahsOperator如果command)要使用python字符串命名变量的形式(如'{foo}_{bar}'.format(bar=bar, foo=foo),并且要用模板变量时,命令就要写成类似这样 'python {file} "{{{{ ds }}}}"'.format(file=file) 正常的字符串替换用一层{},模板变量用四层{}。 另外一个小提示是,ds 这种时间变量两边要带上引号,否则2017-02-13 00:00:00的变量会被 shell 认为是两个参数, 当然这可不是 airflow 的锅。

向opeartor的template中传参

大部分的operator都有template参数,有部分operator同时有param参数用于传参,但是有部分operator没有param参数,这个使用应该使用baseoperator中的params参数进行传参,因为template只是Jinja2,参考这里,以及官网,使用方式直接添加一个params参数就行

sshOperator(
    task_id='task_1_data_file_cleanup',
    bash_command="python cleanup.py --date {{ params.date }} 2>&1 >>  /tmp/airflow/data_dir_cleanup.log",
    params = {'date' : 'this-should-be-a-date'},
    dag=dag
)

外部触发DAG并传递参数

参考这里,通过--confflag传递参数,如{{ dag_run.conf['key'] }},如果operator中要使用时,就context['dag_run'].conf['key']拿回对应的参数

Ref


⚠️ **GitHub.com Fallback** ⚠️