airflow笔记 - 9dian/Index GitHub Wiki
作为一个文本转SQL的生成器,结合参考示例用MySQL语法将用户问题转为正确的SQL语句,SQL中表和字段的别名不要用中文,仅返回SQL,无需解释。 上下文开始 生成的表名和表字段均来自以下表: 表名: dwd_pack_vin_msg_all,车辆平台信息表,主要存储QH,OA等车型信息 字段: batteryLabelCode(电池条码:包体或模组),PackType(Pack类型),remark(备注),vin(车架号),vehiType(车辆类型),packDate(包体日期),packFactory(包体工厂),packModel(包体项目),codeSource(数据来源),vehiStatus(总装标记),purchaseDate(销售日期),upReportDate(包体上线日期),downReportDate(故障下线日期),recFlag(重组标记),labelModel(条码截取项目),assemblyTime(组装时间),offlineTime(下线时间),whinTime(入库时间),whoutTime(出库时间),factoryName(生基地),seriesName(车系),series(车系),model(车型),notice(公告号),sequence(序号),cellFactory(电芯工厂),cellMode(电芯型号),cellNumber(电芯节数),platfrom(平台QH/OA),updateBy(更新人),updateTime(更新时间),cellLabelCode(芯条码),cellLabelcode19(cellLabelCode),moduleLabelCode(moduleLabelCode),moduleLabelGrade(moduleLabelGrade),cellCode(电芯编码),cutCode(电芯截取码),cellType(电芯型号),battymile(电池里程),downReportBattery(换下包体号),locationProvince(销售位置省份),upReportBattery(换上包体号) 表名: dwd_vin_sale,销售数据 字段: VIN(车架号),PURCHASE_DATE(销售日期),DELIVERY_DT(交车日期),VEHICLE_USE_NAME(用途),LOCA_CITY(城市),MODEL_NAME(车型),PACK_MODEL(包体项目),LOCA_PROVINCE(省份),DATA_SOURCE(车辆类型),SERIES_NAME(车系),CUSTOMER_NAME(户名称),VEHICLE_LICENSE(车牌),CREATE_DATE(数据添加时间),ASSEMBLY_DATE(生产日期),PACK_CODE(包体条码),PACK_FACTORY(包体工厂),PACK_DATE(包体日期),LABEL_MODEL(条码项目截取),MATERIALCODE(包体物料编码),DESCRIPTION(包体料描述) 表名: dwd_vehi_status,车辆总装数据 字段: vin(车架号),batteryLabelCode(电池条码:包体或模组),vehiType(车辆类型),packDate(包体日期),packFactory(包体工厂),packModel(包体项目),labelModel(条码截取项目),assemblyTime(组装时间),offlineTime(下线时间),whinTime(库时间),whoutTime(出库时间),factoryName(生产基地),series(车系),model(车型),notice(公告号),sequence(序号),cellFactory(电芯工厂),cellMode(电芯型号),cellNumber(电芯节数),platfrom(平台QH/OA),updateBy(更新人),updateTime(更新时间),cellLabelCode(电芯条码),cellLabelcode19(19位电芯条码),moduleLabelCode(模组条码),moduleLabelGrade(模组档位),cellCode(电芯编码),cutCode(截取码) 参考示例: 问: 根据销售数据,车辆总装数据和车辆平台信息,查询OA和QH车型2023年5月销量 答: select date_format(PURCHASE_DATE, '%Y-%m') as ymonth, sum(case when dpvma.platfrom='OA' then 1 else 0 end) as OAsale, sum(case when dpvma.platfrom='QH' then 1 else 0 end) as QHsale from dwd_vin_sale as dvs left join dwd_vehi_status as dvs2 on dvs.vin =dvs2.vin left join dwd_pack_vin_msg_all as dpvma on dvs2.batteryLabelCode =dpvma.batteryLabelCode where LENGTH(dpvma.platfrom)>1 and date_format(PURCHASE_DATE, '%Y-%m') = '2023-05' and dvs.DATA_SOURCE='乘用车' group by date_format(PURCHASE_DATE, '%Y-%m') order by ymonth desc; 上下文结束
用户问题: 根据销售数据,车辆总装数据和车辆平台信息,查询OA和QH车型2023年5月以前每个月的销量
Table of contents generated with markdown-toc
~/airflow/logs/scheduler/latest/${dagfilenname}.log文件的异常信息如下.
[2020-08-19 04:50:03,593] {jobs.py:343} DagFileProcessor16637774 INFO - Started process (PID=29598) to work on /root/airflow/dags/aipdc_dag_etl.py
[2020-08-19 04:50:03,597] {jobs.py:1521} DagFileProcessor16637774 INFO - Processing file /root/airflow/dags/aipdc_dag_etl.py for tasks to queue
[2020-08-19 04:50:03,597] {models.py:167} DagFileProcessor16637774 INFO - Filling up the DagBag from /root/airflow/dags/aipdc_dag_etl.py
[2020-08-19 04:50:04,042] {jobs.py:1535} DagFileProcessor16637774 INFO - DAG(s) ['aipdc_dag_etl'] retrieved from /root/airflow/dags/aipdc_dag_etl.py
[2020-08-19 04:50:04,059] {jobs.py:1169} DagFileProcessor16637774 INFO - Processing aipdc_dag_etl
[2020-08-19 04:50:04,085] {jobs.py:1173} DagFileProcessor16637774 INFO - Created <DagRun aipdc_dag_etl @ 2020-08-18 04:50:00: scheduled__2020-08-18T04:50:00, externally triggered: False>
[2020-08-19 04:50:04,087] {jobs.py:860} DagFileProcessor16637774 INFO - Examining DAG run <DagRun aipdc_dag_etl @ 2020-08-18 04:50:00: scheduled__2020-08-18T04:50:00, externally triggered: False>
[2020-08-19 04:50:04,092] {models.py:4024} DagFileProcessor16637774 INFO - Updating state for <DagRun aipdc_dag_etl @ 2020-08-18 04:50:00: scheduled__2020-08-18T04:50:00, externally triggered: False> considering 6 task(s)
[2020-08-19 04:50:04,106] {jobs.py:354} DagFileProcessor16637774 ERROR - Got an exception! Propagating...
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper
pickle_dags)
File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in process_file
self._process_dags(dagbag, dags, ti_keys_to_schedule)
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1174, in _process_dags
self._process_task_instances(dag, tis_out)
File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 905, in _process_task_instances
session=session):
File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1116, in are_dependencies_met
session=session):
File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1140, in get_failed_dep_statuses
dep_context):
File "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 94, in get_dep_statuses
for dep_status in self._get_dep_statuses(ti, session, dep_context):
File "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/prev_dagrun_dep.py", line 57, in _get_dep_statuses
previous_ti = ti.previous_ti
File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1089, in previous_ti
return last_dagrun.get_task_instance(self.task_id, session=session)
File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
result = func(*args, **kwargs)
File "/usr/lib/python2.7/site-packages/airflow/models.py", line 3973, in get_task_instance
TI.task_id == task_id
File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2820, in one
raise orm_exc.NoResultFound("No row was found for one()")
NoResultFound: No row was found for one()
[2020-08-19 04:50:11,720] {jobs.py:343} DagFileProcessor16637789 INFO - Started process (PID=29921) to work on /root/airflow/dags/aipdc_dag_etl.py
[2020-08-19 04:50:11,723] {jobs.py:1521} DagFileProcessor16637789 INFO - Processing file /root/airflow/dags/aipdc_dag_etl.py for tasks to queue
[2020-08-19 04:50:11,723] {models.py:167} DagFileProcessor16637789 INFO - Filling up the DagBag from /root/airflow/dags/aipdc_dag_etl.py
[2020-08-19 04:50:12,126] {jobs.py:1535} DagFileProcessor16637789 INFO - DAG(s) ['aipdc_dag_etl'] retrieved from /root/airflow/dags/aipdc_dag_etl.py
[2020-08-19 04:50:12,144] {jobs.py:1169} DagFileProcessor16637789 INFO - Processing aipdc_dag_etl
[2020-08-19 04:50:12,154] {jobs.py:860} DagFileProcessor16637789 INFO - Examining DAG run <DagRun aipdc_dag_etl @ 2020-08-18 04:50:00: scheduled__2020-08-18T04:50:00, externally triggered: False>
[2020-08-19 04:50:12,159] {models.py:4024} DagFileProcessor16637789 INFO - Updating state for <DagRun aipdc_dag_etl @ 2020-08-18 04:50:00: scheduled__2020-08-18T04:50:00, externally triggered: False> considering 6 task(s)
[2020-08-19 04:50:12,172] {jobs.py:354} DagFileProcessor16637789 ERROR - Got an exception! Propagating...
ref: https://issues.apache.org/jira/browse/AIRFLOW-1017
vim /usr/lib/python2.7/site-packages/airflow/models.py, 将one()改为first() .
注: 此处可参考airflow1.10的源码/usr/local/lib/python3.6/dist-packages/airflow/models/dagrun.py (line192-209), 搜索关键字(Returns the task instance specified by task_id for this dag run)
3962 @provide_session
3963 def get_task_instance(self, task_id, session=None):
3964 """
3965 Returns the task instance specified by task_id for this dag run
3966 :param task_id: the task id
3967 """
3968
3969 TI = TaskInstance
3970 ti = session.query(TI).filter(
3971 TI.dag_id == self.dag_id,
3972 TI.execution_date == self.execution_date,
3973 TI.task_id == task_id
3974 # @20200819
3975 #).one()
3976 ).first()
3977
3978 return ti
3979
使用ps -ef | grep airflow查找task对应的进程
或看task的日志决定ps的关键字, ps -ef |grep dws_
[2020-08-19 17:59:46,180] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2020-08-19 17:59:46,180] {base_task_runner.py:95} INFO - Subtask: Starting attempt 1 of 2
[2020-08-19 17:59:46,180] {base_task_runner.py:95} INFO - Subtask: --------------------------------------------------------------------------------
[2020-08-19 17:59:46,180] {base_task_runner.py:95} INFO - Subtask:
[2020-08-19 17:59:46,186] {base_task_runner.py:95} INFO - Subtask: [2020-08-19 17:59:46,186] {models.py:1342} INFO - Executing <Task(BashOperator): dws_stage> on 2020-08-18 17:57:00
[2020-08-19 17:59:46,198] {base_task_runner.py:95} INFO - Subtask: [2020-08-19 17:59:46,198] {bash_operator.py:71} INFO - tmp dir root location:
[2020-08-19 17:59:46,198] {base_task_runner.py:95} INFO - Subtask: /tmp
[2020-08-19 17:59:46,199] {base_task_runner.py:95} INFO - Subtask: [2020-08-19 17:59:46,198] {bash_operator.py:80} INFO - Temporary script location :/tmp/airflowtmpiqcsCr//tmp/airflowtmpiqcsCr/dws_stages9ZAbL
[2020-08-19 17:59:46,199] {base_task_runner.py:95} INFO - Subtask: [2020-08-19 17:59:46,198] {bash_operator.py:81} INFO - Running command: /usr/local/aipdc.curr/dws_stage.sh 20200818 20200819
[2020-08-19 17:59:46,205] {base_task_runner.py:95} INFO - Subtask: [2020-08-19 17:59:46,204] {bash_operator.py:90} INFO - Output:
[root@mrpl100 logs]# ps -ef | grep dws
root 41350 38062 2 16:43 ? 00:00:53 /usr/bin/python /usr/bin/airflow run aipdc_dag_etl dws_stage 2020-08-18T16:23:00 --local -sd /root/airflow/dags/aipdc_dag_etl.py
root 41481 41350 0 16:43 ? 00:00:06 /usr/bin/python /usr/bin/airflow run aipdc_dag_etl dws_stage 2020-08-18T16:23:00 --job_id 35372 --raw -sd DAGS_FOLDER/aipdc_dag_etl.py
root 41668 41481 0 16:43 ? 00:00:00 bash /tmp/airflowtmpoLwCah/dws_stageHGkCId
root 41669 41668 0 16:43 ? 00:00:00 /bin/bash /usr/local/aipdc.curr/dws_stage.sh 20200818 20200819
root 47571 26079 0 16:46 pts/22 00:00:00 vim dws_stage.sh
root 53131 33389 0 17:14 pts/20 00:00:00 grep --color=auto dws
[root@mrpl100 logs]# ps -ef|grep 38062
root 38062 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 41350 38062 2 16:43 ? 00:00:55 /usr/bin/python /usr/bin/airflow run aipdc_dag_etl dws_stage 2020-08-18T16:23:00 --local -sd /root/airflow/dags/aipdc_dag_etl.py
root 55719 33389 0 17:15 pts/20 00:00:00 grep --color=auto 38062
[root@mrpl100 logs]# ps -ef |grep airflow
root 4264 1 3 2019 ? 9-14:01:43 /usr/bin/python /usr/bin/airflow webserver -p 8181
root 4352 4264 0 2019 ? 02:25:54 gunicorn: master [airflow-webserver]
root 19394 1 0 2019 ? 04:29:45 /usr/bin/python /usr/bin/airflow run ac_dag_etl oss_to_hive 2019-08-04T00:00:00 --local -sd /root/airflow/dags/ac_dag_etl.py
root 19447 19394 0 2019 ? 00:00:01 /usr/bin/python /usr/bin/airflow run ac_dag_etl oss_to_hive 2019-08-04T00:00:00 --job_id 33687 --raw -sd DAGS_FOLDER/ac_dag_etl.py
root 35908 3506 0 16:15 pts/24 00:00:00 tail -200f /root/airflow/logs/scheduler/latest/aipdc_dag_etl.py.log
root 38058 1 1 May06 ? 1-16:48:02 /usr/bin/python /usr/bin/airflow scheduler -D
root 38060 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38061 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38062 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38063 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38064 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38065 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38066 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38067 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38068 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38069 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38070 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38071 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38073 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38074 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38075 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38076 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38077 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38078 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38079 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38080 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38081 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38082 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38083 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38084 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38085 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38086 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38087 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38088 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38089 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38090 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38091 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 38092 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 41350 38062 2 16:43 ? 00:00:56 /usr/bin/python /usr/bin/airflow run aipdc_dag_etl dws_stage 2020-08-18T16:23:00 --local -sd /root/airflow/dags/aipdc_dag_etl.py
root 41481 41350 0 16:43 ? 00:00:06 /usr/bin/python /usr/bin/airflow run aipdc_dag_etl dws_stage 2020-08-18T16:23:00 --job_id 35372 --raw -sd DAGS_FOLDER/aipdc_dag_etl.py
root 41668 41481 0 16:43 ? 00:00:00 bash /tmp/airflowtmpoLwCah/dws_stageHGkCId
root 46401 38080 0 Aug10 ? 00:47:04 /usr/bin/python /usr/bin/airflow run aipdc_dag_etl interaction_and_control_load 2020-08-09T04:50:00 --local -sd /root/airflow/dags/aipdc_dag_etl.py
root 46503 46401 0 Aug10 ? 00:00:07 /usr/bin/python /usr/bin/airflow run aipdc_dag_etl interaction_and_control_load 2020-08-09T04:50:00 --job_id 35307 --raw -sd DAGS_FOLDER/aipdc_dag_etl.py
root 52496 4352 6 17:13 ? 00:00:06 [ready] gunicorn: worker [airflow-webserver]
root 53650 4352 9 17:14 ? 00:00:06 [ready] gunicorn: worker [airflow-webserver]
root 54909 4352 16 17:15 ? 00:00:05 [ready] gunicorn: worker [airflow-webserver]
root 56009 4352 99 17:15 ? 00:00:06 [ready] gunicorn: worker [airflow-webserver]
root 56337 38058 0 17:15 ? 00:00:00 [airflow] <defunct>
root 56338 38058 0 17:15 ? 00:00:00 [airflow] <defunct>
root 56345 33389 0 17:15 pts/20 00:00:00 grep --color=auto airflow
[root@mrpl100 logs]#
找到的进程,如下.
root 41350 38062 2 16:43 ? 00:00:56 /usr/bin/python /usr/bin/airflow run aipdc_dag_etl dws_stage 2020-08-18T16:23:00 --local -sd /root/airflow/dags/aipdc_dag_etl.py
root 41481 41350 0 16:43 ? 00:00:06 /usr/bin/python /usr/bin/airflow run aipdc_dag_etl dws_stage 2020-08-18T16:23:00 --job_id 35372 --raw -sd DAGS_FOLDER/aipdc_dag_etl.py
root 41668 41481 0 16:43 ? 00:00:00 bash /tmp/airflowtmpoLwCah/dws_stageHGkCId
使用kill -9 19447结束进程, 此时airflow的WEB管理控制台会显示task失败(并启动重试), 日志文件显示 "... Task exited with return code -9"
成功执行的日志信息是 "... Task exited with return code 0"
代码异常/报错的日志信息是 "... Task exited with return code 1"
[2020-08-19 17:26:13,272] {base_task_runner.py:95} INFO - Subtask: [2020-08-19 17:26:13,272] {models.py:1462} ERROR - Bash command failed
[2020-08-19 17:26:13,395] {base_task_runner.py:95} INFO - Subtask: Traceback (most recent call last):
[2020-08-19 17:26:13,395] {base_task_runner.py:95} INFO - Subtask: File "/usr/bin/airflow", line 28, in <module>
[2020-08-19 17:26:13,395] {base_task_runner.py:95} INFO - Subtask: args.func(args)
[2020-08-19 17:26:13,396] {base_task_runner.py:95} INFO - Subtask: File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 422, in run
[2020-08-19 17:26:13,396] {base_task_runner.py:95} INFO - Subtask: pool=args.pool,
[2020-08-19 17:26:13,396] {base_task_runner.py:95} INFO - Subtask: File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
[2020-08-19 17:26:13,396] {base_task_runner.py:95} INFO - Subtask: result = func(*args, **kwargs)
[2020-08-19 17:26:13,396] {base_task_runner.py:95} INFO - Subtask: File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
[2020-08-19 17:26:13,396] {base_task_runner.py:95} INFO - Subtask: result = task_copy.execute(context=context)
[2020-08-19 17:26:13,396] {base_task_runner.py:95} INFO - Subtask: File "/usr/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 100, in execute
[2020-08-19 17:26:13,396] {base_task_runner.py:95} INFO - Subtask: raise AirflowException("Bash command failed")
[2020-08-19 17:26:13,397] {base_task_runner.py:95} INFO - Subtask: airflow.exceptions.AirflowException: Bash command failed
[2020-08-19 17:26:16,293] {jobs.py:2083} INFO - Task exited with return code 1
另外,还可以看到挂起的"远古"僵尸进程,比喻:
root 38062 38058 0 May06 ? 00:00:00 /usr/bin/python /usr/bin/airflow scheduler -D
root 41350 38062 2 16:43 ? 00:00:55 /usr/bin/python /usr/bin/airflow run aipdc_dag_etl dws_stage 2020-08-18T16:23:00 --local -sd /root/airflow/dags/aipdc_dag_etl.py
同样可以使用kill -9 结束之.
clear主要针对出错的task, task达到重试次数后任然失败此时需clear以重新启动task(部分回填数据).
Task Instance Details
Dependencies Blocking Task From Getting Scheduled
Dependency | Reason |
---|---|
Dagrun Running | Task instance's dagrun was not in the 'running' state but in the state 'success'. |
Task Instance State | Task is in the 'success' state which is not a valid state for execution. The task must be cleared in order to be run. |
Mark Success主要针对dag文件中添加了新的task, 默认情况下dag发布后再添加的task airflow默认是不会自动运行的, 1.8.0版本还会报错 No row was found for one(), 此时点击task节点在弹出的上图中选择"View Log"(可能会报错,针对黑色方块) => "Task Instance Details" 会得到如下提示.
depends_on_past is true for this task, but the previous task instance is in the state 'None' which is not a successful state.
此时将最近一次的白色节点(方块, the state 'None') Mark Success则可以使新添加的task自动运行(在下个调度点自动触发).
ref: https://stackoverflow.com/questions/40651783/airflow-how-to-delete-a-dag https://issues.apache.org/jira/browse/AIRFLOW-1002
I just wrote a script that deletes everything related to a particular dag, but this is only for MySQL. You can write a different connector method if you are using PostgreSQL. Originally the commands where posted by Lance on https://groups.google.com/forum/#!topic/airbnb_airflow/GVsNsUxPRC0 I just put it in script. Hope this helps. Format: python script.py dag_id
import sys
import MySQLdb
dag_input = sys.argv[1]
query = {'delete from xcom where dag_id = "' + dag_input + '"',
'delete from task_instance where dag_id = "' + dag_input + '"',
'delete from sla_miss where dag_id = "' + dag_input + '"',
'delete from log where dag_id = "' + dag_input + '"',
'delete from job where dag_id = "' + dag_input + '"',
'delete from dag_run where dag_id = "' + dag_input + '"',
'delete from dag where dag_id = "' + dag_input + '"' }
def connect(query):
db = MySQLdb.connect(host="hostname", user="username", passwd="password", db="database")
cur = db.cursor()
cur.execute(query)
db.commit()
db.close()
return
for value in query:
print value
connect(value)
- python代码生成加密密码.
import bcrypt
print(bcrypt.hashpw('123', bcrypt.gensalt(12)))
- 更新表users相关记录的password字段内容.
mysql> select * from users;
+----+----------------+--------------------------------+--------------------------------------------------------------+
| id | username | email | password |
+----+----------------+--------------------------------+--------------------------------------------------------------+
| 1 | q | q | $2b$12$V3Sy5icwW6TMKSRKOGZZLuIs3RR16YwcTcXKkLhHBxM22guieZg3y |
| 4 | xi | xia | $2b$12$.StE6ces1elR4AP8FatEMuTOoOAKmPNG2PEiqulCEfRpj51HI.l1q |
| 5 | liuq | ex_liu | $2b$12$wAgPqXKOukrVr.bsEcD9E.xaCqydrqvvlWH.7Mw.kWn5epNxdwHmq |
+----+----------------+--------------------------------+--------------------------------------------------------------+
Python
# navigate to the airflow installation directory
$ cd ~/airflow
$ python
Python 2.7.9 (default, Feb 10 2015, 03:28:08)
Type "help", "copyright", "credits" or "license" for more information.
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'new_user_name'
>>> user.email = '[email protected]'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'admin'
user.email = ''
user.password = '*'
user.superuser = 1
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()
命令行 (RBAC UI)
airflow users create --username admin --role Admin --password ***12**** --email '' --firstname '' --lastname ''
WEB authenticate is not effective in (so disable it) RBAC UI
BashOperator 的bash_command值最后必须加一个空格符。
speech_dialog_et = BashOperator(
task_id='speech_dialog_et',
bash_command='{{ params.shell_dir }}/speech_dialog_et.sh {{ yesterday_ds }} {{ ds_nodash }} ',
params={'shell_dir': script_dir},
retries=1,
dag=dag,
)
光从字面意思比较难理解,但是后来去查了一下文档结合他的行为看了一下可以把它简单的理解成当你错过了某一次执行时间之后,往回去补充执行的行为。我们可以使用手动方法来执行这个行为。
airflow backfill aipdc*_et -s 2015-06-01 -e 2015-06-07
他会回补这个时间段开始的 和 -e 后面时间段结束期间所有的任务执行。回补的意思就是把没有执行的操作都执行一遍。
这个特性想法很好,但是自自动触发的时候不注意就会产生非常不可预期的问题。
在给 dag 配置的时候指定的 default_args 上面有一个参数 start_date。如果我们不给 dag 指定不回补,那么 airflow 会默认回补从系统当前时间到我们指定的 start_date 期间的任务。如果这个参数设置得不恰当会打来恐怖的回补,所以一般我都会禁用回补。
the_dag = DAG(
'**_dag',
default_args=default_args,
schedule_interval=u'0 0 * * *',
catchup=False)
指定 _catchup=False _。让他从最新的任务时间点开始执行,详细内容参考官方文档Catchup。
使用hive -e或beeline -e时, hiveql/sparkSQL中使用 "-- sql ;" 作为注释行(注意行尾的;), 此时的hive -e或beeline -e在命令直接执行或放到某个bash shell中执行此shell都没有问题, 但如果让此shell作为airflow1.8 dag的task(BashOperator)执行则会挂起/卡住在首个"-- sql ;" 作为注释行.
# added @20200818 airflow调用shell中的hivesql不能有注释
HADOOP_USER_NAME=hive hive -e " \
set io.sort.mb=512;
set hive.exec.compress.intermediate=true;
set mapreduce.map.memory.mb=2048;
set mapreduce.reduce.memory.mb=10240;
set mapreduce.map.java.opts=-Xmx1524m;
set mapreduce.reduce.java.opts=-Xmx8192m;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nostrict;
-- 实时数据需要使用hive执行, 每天音箱数据放到 tmp.daily_speaker;
insert into tmp.daily_speaker partition (year_month)
select regexp_replace(part_dt, '-', '') domoy, case length(speaker_device_id) when 22 then concat(concat('00001C', speaker_device_id), '0000') else speaker_device_id end sn, 2, $year_month from tmp.speaker_orionstar where part_dt = '${date_start}' group by part_dt, speaker_device_id ;
-- aipdc.intent 每天音箱数据放到 tmp.daily_speaker;
insert into tmp.daily_speaker partition (year_month)
select regexp_replace('${date_start}', '-', ''), sn, 1, $year_month from aipdc.intent where year_month=$year_month and to_date(time) = '${date_start}' and speech_source='speaker' group by sn;
"
beeline -u 'jdbc:hive2://ip:10009/aipdc;' -n hive -e " \
set io.sort.mb=512;
set hive.exec.compress.intermediate=true;
set mapreduce.map.memory.mb=2048;
set mapreduce.reduce.memory.mb=10240;
set mapreduce.map.java.opts=-Xmx1524m;
set mapreduce.reduce.java.opts=-Xmx8192m;
-- 将会被代替 ;
-- insert into aipdc.device_accumulate_stage(domoy, newdevice_cnt, device_cnt, speech_source);
insert into aipdc.device_accumulate_stage
select t2.domoy, COALESCE(t1.newdevice_cnt, 0) newdevice_cnt, (t2.pre_device_cnt + coalesce(t1.newdevice_cnt, 0)) as device_cnt, t2.speech_source, null, null from (
select regexp_replace('${date_start}', '-', '') as domoy, count(distinct sn) newdevice_cnt, case when speech_source is null then 'Total' else speech_source end as speech_source from aipdc.intent t where year_month = $year_month and to_date(t.time) = '${date_start}' and not exists (
select 1 from aipdc.intent n where to_date(n.time) < '${date_start}' and t.sn = n.sn
) group by t.speech_source with rollup
) t1 right join (
select regexp_replace('${date_start}', '-', '') as domoy, device_cnt as pre_device_cnt, speech_source from aipdc.device_accumulate_stage where domoy = regexp_replace(date_sub('${date_start}', 1), '-', '')
) t2 on t1.domoy = t2.domoy and t1.speech_source = t2.speech_source;
-- 留存类指标 ;
insert into aipdc.device_retention_stage
select t1.domoy, t1.c1d, t2.c2d_excluding_end, t3.c3d_excluding_end, t7.c7d_excluding_end, c60_silent, t1.speech_source, null, null from (
select regexp_replace('${date_start}', '-', '') as domoy, count(distinct sn) c1d, case when speech_source is null then 'Total' else speech_source end as speech_source from aipdc.intent t where to_date(t.time) = date_sub('${date_start}', 1) and exists (
select 1 from aipdc.intent n where to_date(n.time) = '${date_start}' and n.sn = t.sn
) group by speech_source with rollup
) t1 left join (
select regexp_replace('${date_start}', '-', '') as domoy, count(distinct sn) c2d_excluding_end, case when speech_source is null then 'Total' else speech_source end as speech_source from aipdc.intent t where to_date(t.time) = date_sub('${date_start}', 2) and exists (
select 1 from aipdc.intent n where to_date(n.time) > date_sub('${date_start}', 2) and to_date(n.time) < '${date_start}' and n.sn = t.sn
) group by speech_source with rollup
) t2 on t1.domoy = t2.domoy and t1.speech_source = t2.speech_source join (
select regexp_replace('${date_start}', '-', '') as domoy, count(distinct sn) c3d_excluding_end, case when speech_source is null then 'Total' else speech_source end as speech_source from aipdc.intent t where to_date(t.time) = date_sub('${date_start}', 3) and exists (
select 1 from aipdc.intent n where to_date(n.time) > date_sub('${date_start}', 3) and to_date(n.time) < '${date_start}' and n.sn = t.sn
) group by speech_source with rollup
) t3 on t1.domoy = t3.domoy and t1.speech_source = t3.speech_source join (
select regexp_replace('${date_start}', '-', '') as domoy, count(distinct sn) c7d_excluding_end, case when speech_source is null then 'Total' else speech_source end as speech_source from aipdc.intent t where to_date(t.time) = date_sub('${date_start}', 7) and exists (
select 1 from aipdc.intent n where to_date(n.time) > date_sub('${date_start}', 7) and to_date(n.time) < '${date_start}' and n.sn = t.sn
) group by speech_source with rollup
) t7 on t1.domoy = t7.domoy and t1.speech_source = t7.speech_source join (
select regexp_replace('${date_start}', '-', '') as domoy, count(distinct sn) c60_silent, case when speech_source is null then 'Total' else speech_source end as speech_source from aipdc.intent t where to_date(t.time) = date_sub('${date_start}', 60) and not exists (
select 1 from aipdc.intent n where to_date(n.time) > date_sub('${date_start}', 60) and to_date(n.time) <= '${date_start}' and n.sn = t.sn
) group by speech_source with rollup
) t60 on t1.domoy = t60.domoy and t1.speech_source = t60.speech_source;
-- ### begin: 每日汇总 '设备数基础指标,如:累计,新增,日活等' ;
-- # 1)
insert into aipdc.device_base_stage
select COALESCE(t1.domoy,t2.domoy,t3.domoy), COALESCE(t2.new_cnt,0) new_cnt, (t3.pre_device_cnt + coalesce(t2.new_cnt,0)) as accumulate_cnt, COALESCE(t1.active_cnt, 0), null active2_cnt, COALESCE(t1.speech_source,t2.speech_source,t3.speech_source) speech_source, COALESCE(t1.category_no, t2.category_no, t3.category_no) category_no from (
select regexp_replace('${date_start}','-','') domoy, count(distinct sn) active_cnt, speech_source, category_no from aipdc.intent t where year_month = $year_month and speech_source != 'speaker' and to_date(t.time) = '${date_start}' group by t.speech_source, category_no
) t1 full join (
select regexp_replace('${date_start}','-','') as domoy, count(distinct sn) new_cnt, speech_source, category_no from aipdc.intent t where year_month = $year_month and speech_source != 'speaker' and to_date(t.time) = '${date_start}' and not exists ( select 1 from aipdc.intent n where to_date(n.time) < '${date_start}' and t.sn = n.sn) group by t.speech_source, category_no
) t2 on t1.domoy = t2.domoy and t1.speech_source = t2.speech_source and t1.category_no = t2.category_no full join (
select regexp_replace('${date_start}','-','') as domoy, coalesce(accumulate_cnt,0) pre_device_cnt, speech_source, category_no from aipdc.device_base_stage where speech_source != 'speaker' and domoy = regexp_replace(date_sub('${date_start}',1),'-','')
) t3 on t1.domoy = t3.domoy and t1.speech_source = t3.speech_source and t1.category_no = t3.category_no;
-- # 2)小美音箱 ;
insert into aipdc.device_base_stage
select t1.domoy, COALESCE(t2.new_cnt, 0) new_cnt, (coalesce(t3.pre_device_cnt, 0) + coalesce(t2.new_cnt, 0)) as accumulate_cnt, t1.active_cnt, null active2_cnt, t1.speech_source, t1.category_no from (
select regexp_replace('${date_start}', '-', '') as domoy, count(distinct sn) active_cnt, 'speaker' as speech_source, '1C' as category_no from tmp.daily_speaker t where domoy = regexp_replace('${date_start}', '-', '')
) t1 left join (
select regexp_replace('${date_start}', '-', '') as domoy, count(distinct sn) new_cnt, 'speaker' speech_source, '1C' category_no from tmp.daily_speaker t where t.domoy = regexp_replace('${date_start}', '-', '') and not exists ( select 1 from tmp.daily_speaker n where n.domoy < regexp_replace('${date_start}', '-', '') and t.sn = n.sn)
) t2 on t1.domoy = t2.domoy and t1.speech_source = t2.speech_source and t1.category_no = t2.category_no left join (
select regexp_replace('${date_start}', '-', '') as domoy, coalesce(accumulate_cnt, 0) pre_device_cnt, speech_source, category_no from aipdc.device_base_stage where speech_source = 'speaker' and domoy = regexp_replace(date_sub('${date_start}', 1), '-', '')
) t3 on t1.domoy = t3.domoy and t1.speech_source = t3.speech_source and t1.category_no = t3.category_no;
-- ### end: 每日汇总 '设备数基础指标,如:累计,新增,日活等' ;
-- ### begin: 累计交互次数 ;
insert into aipdc.device_interacted_stage
select regexp_replace('${date_start}', '-', '') domoy, (t1.prev_iaccum_cnt + coalesce(t2.dicnt, 0)) iaccum_cnt, coalesce(t2.speech_source, t1.speech_source), coalesce(t2.category_no, t1.category_no) from (
select domoy, iaccum_cnt prev_iaccum_cnt, speech_source, category_no from aipdc.device_interacted_stage where domoy=regexp_replace(date_sub('${date_start}', 1), '-', '')
) t1 full join (
select regexp_replace('${date_start}', '-', '') as domoy, count(1) dicnt, speech_source, category_no from aipdc.intent where year_month=$year_month and to_date(time) = '${date_start}' and speech_source != 'speaker' group by speech_source, category_no
union all
select domoy, sum(dicnt) as dicnt, speech_source, category_no from (
select regexp_replace('${date_start}', '-', '') as domoy, count(1) dicnt, speech_source, category_no from aipdc.intent where year_month=$year_month and to_date(time) = '${date_start}' and speech_source = 'speaker' group by speech_source, category_no
union all
select regexp_replace(part_dt, '-', '') domoy, count(1) dicnt, 'speaker' as speech_source, '1C' as category_no from tmp.speaker_orionstar where part_dt = '${date_start}' group by part_dt
) t group by domoy, speech_source, category_no
) t2 on t1.speech_source = t2.speech_source and t1.category_no = t2.category_no;
-- ### end: 累计交互次数 ;
上面的脚本在airflow1.8中执行就会卡住, 一旦airflow1.8运行了dag的此task需要参考airflow-dag中的task挂住...解决此问题.