airflow - xiaoxin01/Blog GitHub Wiki

什么是 Airflow

Apache Airflow 是一个开源的面向批处理的工作流平台,可以理解为一个加强版的 crontab,它可以让不同的 任务 之间拥有依赖关系。

Airflow 的设计思想是 workflow as code,任务都是用 python 代码编写,方便版本控管,但同时也代表着,它不适用于不会编码的人员,或者偏爱在 UI 上面点击而不是通过编码来设计任务的人。

Airflow 架构

Airflow 的结构如下:

Arthitecture

  • A scheduler ,它处理触发调度的 workflow ,并将任务提交给 executor 运行。
  • An executor ,处理正在运行的任务。 在默认的 Airflow 安装中,这会在 scheduler 中运行所有内容,但大多数适合生产的执行程序实际上会将任务执行推送给 workers 。
  • A webserver ,它提供了一个方便的用户界面来检查、触发和调试 DAG 和任务的行为。
  • DAG 文件的文件夹,由 scheduler 和 executor(以及执行程序拥有的任何 workers )读取
  • A metadata database,由 scheduler、executor 和 webserver 用来存储状态。

Airflow 中 每一个 workflow 由 DAG (a Directed Acyclic Graph) 来描述,一个 DAG 包含多个独立的 任务(Tasks),下图演示了一个由不同 Tasks 组成的 DAG:

DAG

DAG 由 Task 组成,Task 为 Airflow 的基本执行单元,不同的 Task 通过设定上下游依赖关系来完成 DAG 的执行。

Task 分为三种基本类型:

  • Operators ,预定义的任务模板,您可以将它们快速串在一起以构建 DAG 的大部分。
  • Sensors 是 Operator 的一个特殊子类,它完全是关于等待外部事件发生的。
  • 一个 TaskFlow 修饰的 @task,它是一个打包为任务的自定义 Python 函数。

默认自带的热门 Operators:

  • BashOperator - executes a bash command
  • PythonOperator - calls an arbitrary Python function
  • EmailOperator - sends an email
  • Use the @task decorator to execute an arbitrary Python function. It doesn’t support rendering jinja templates passed as arguments.

另外 provider package 包含的热门 operator:

  • SimpleHttpOperator
  • MySqlOperator
  • PostgresOperator
  • MsSqlOperator
  • OracleOperator
  • JdbcOperator
  • DockerOperator
  • HiveOperator
  • S3FileTransformOperator
  • PrestoToMySqlOperator
  • SlackAPIOperator

Sensors 用来等待某件事情发生之后,执行后续的 Task。这个事件可以是时间、文件被创建、外部事件等。有两种不同的模式:

  • poke (default): 占用一个 worker ,一直执行
  • reschedule: 检查的时候启动一个 worker,其他时间 sleep

一个简单的方法来确定使用哪种模式:每秒检查一次的东西应该处于 poke 模式,而每分钟检查一次的东西应该处于 reschedule 模式。

Executor 用来控制 task 运行,有两种类型:locally (在 scheduler 进程中运行) 和 remotely (workers 池)。Airflow 默认使用 SequentialExecutor 模式,如果是单机的话,建议更改为 LocalExecutor (可以启动多个进程,SequentialExecutor 相当于启动 1 个进程)。

可能遇到的问题

  1. 安全性

Airflow 的 workflow 是用 DAG folder 中的 python code 来描述,当有多个 project 需要接入 Airflow 的时候,在安全角度上需要考虑到代码的隔离性,一种做法是,让 Airflow 仅仅承担 task 的定义和依赖关系,具体的实现交由 remote api 来完成。

  1. 负载

当接入的 project 很多的时候,需要考虑到性能问题,最好是利用可以高度扩展的平台(比如 k8s)来实现 workers 的扩展。

参考:

⚠️ **GitHub.com Fallback** ⚠️