Apache Airflow Research - mozzihozzi/DevOps GitHub Wiki
Apache Airflow
Apache Airflow ์กฐ์ฌ
- ์๊ฐ
- ์ค์น๋ฐฉ๋ฒ
- ์ฌ์ฉ๊ฐ์ด๋
- ์ปค๋งจ๋ ์ ๋ฆฌ
1. ์๊ฐ
1.1. Apache airflow ๋?
- ์คํ์์ค ์ํฌํ๋ก์ฐ ์ค์ผ์ค๋ง, ๋ชจ๋ํฐ๋ง ํ๋ซํผ
- Airbnb์์ ๊ฐ๋ฐ
- ์ฃผ๋ก ๋น ๋ฐ์ดํฐ์์ ๋ฐ์ดํฐ ํ๋ก์ธ์ค๋ฅผ ๊ด๋ฆฌํ๊ธฐ ์ํด ์ฌ์ฉํจ
- Python ์ธ์ด๋ก ์ฌ์ฉ
1.2. ํน์ง
1. ๋ณต์กํ ํ๋ก์ธ์ค๋ฅผ flow diagram ํํ๋ก ํ์ธ ๊ฐ๋ฅ
2. Python์ผ๋ก task๋ฅผ ์์ฑ ๋ฐ ๊ด๋ฆฌ + ์ด์๊ณ ํธ๋ฆฌํ UI
3. ๊ฐ task์ ์คํ ์๊ฐ, ์ด๋ ฅ์ ์ฝ๊ฒ ํ์ธ ๊ฐ๋ฅ
4. ํ์ํ ๊ฒฝ์ฐ ํน์ task๋ง ์คํ ๊ฐ๋ฅ
5. ๊ฐ task๋ฅผ ๋ณ๋ ฌ๋ก ์คํ ๊ฐ๋ฅ
6. ๋
๋ฆฝ๋ ์์ฒด ์ค์ผ์ฅด๋ฌ๋ก ๊ฐ ์ฌ์ฉ์๊ฐ ๋
๋ฆฝ์ ์ผ๋ก ์ํ ๊ฐ๋ฅ
์ข ํฉํ๋ฉด, ์ฌ์ฉ์๊ฐ workflow๋ฅผ ์ฝ๊ฒ ๊ด๋ฆฌ ๋ฐ ๋ชจ๋ํฐ๋ง ํ ์ ์์
1.3. dag(Directed Acyclic Graph, ๋ฐฉํฅ์ฑ ๋น์ํ ๊ทธ๋ํ)
- workflow๋ฅผ ๊ตฌ์ฑํ๋ task๋ค์ ์งํฉ๋ค์ dag๋ผ๊ณ ํจ
- dag๋ ํ์ชฝ ๋ฐฉํฅ์ผ๋ก๋ง ์งํ๋๊ณ ์ํ๋์ง ์๋ ๊ทธ๋ํ๋ฅผ ์๋ฏธํจ
- ์ฌ๋ฌ task๋ค์ด ์์์ ์ข ์์ฑ์ ๊ฐ์ง
- dag์์์ task๋ค์ Operator๋ก ๋ถํฐ ์ ์ํ๊ณ Operator๋ค์ ์ฌ๋ฌ ์ข ๋ฅ๋ค์ด ์์
1.4. ๊ธฐ๋ณธ ํด๋ ๊ตฌ์กฐ
์ค์น๋ airflow ๋๋ ํฐ๋ฆฌ์ airflow.cfg, airflow.db๋ฅผ ํตํด ์ค์ ์ ๋ณ๊ฒฝํ๊ฑฐ๋ dag, log๋ค์ด ์ ์ฅ๋๋ค.
airflow
โโโ airflow.cfg
โโโ airflow.db
โโโ dags
โ โโโ dags1.py
โ โโโ dags2.py
โ โโโ ...
โโโ logs
โโโ ...
2. ์ค์น ๋ฐฉ๋ฒ
2.1. Prerequisites
- python
- pip
- venv(optional)
2.2. ์ค์น ๊ณผ์
apache-airflow ์ค์น, default๋ก ~/airflow ๊ฒฝ๋ก์ ์ค์นํจ ์ค์น ๊ฒฝ๋ก๋ฅผ ๋ฐ๊พธ๊ณ ์ถ์ผ๋ฉด export AIRFLOW_HOME=~/{path} ์ผ๋ก ์์ ํ ์ค์น gcp(google cloud platform) or postgres์์ ์ค์นํ ๊ฒฝ์ฐ์๋ pip install apache-airflow[postgres, gcp]๋ฅผ ์ฌ์ฉ
pip install apache-airflow
conda๋ก ์ค์นํ ๊ฒฝ์ฐ ์๋์ command๋ฅผ ์ฌ์ฉ
conda install -c conda-forge airflow
airflow ๋ฒ์ ํ์ธ
airflow version
airflow์์ ์ฌ์ฉํ DB ์ด๊ธฐํ airflow.cfg์์ DB์ค์ ์ ํ์ง ์์ผ๋ฉด default๋ก SQLite ์ฌ์ฉ
airflow initdb
UI๋ฅผ ์คํํ ์น์๋ฒ ์คํ, default ํฌํธ๋ 8080, url์ http://localhost:{port}
airflow webserver -p {port}
airflow ์ค์ผ์ฅด๋ฌ ์คํ
airflow scheduler
3. ์ฌ์ฉ ๊ฐ์ด๋
- Airflow Documentation
3.1. dag ์์ฑ
airflow๋ Operator๋ฅผ ํตํด task๋ฅผ ์ ์ํจ. Operator๋ ๋ค์ํ ์ข ๋ฅ๊ฐ ์์.
- DummyOperator
- BashOperator
- PythonOperator
- Dining Operators
- Google Cloud Operators
- Papermill
3.1.1. PythonOperator
PythonOperator parameters
- task_id : ๊ฐ task๋ฅผ ๊ตฌ๋ถํ๊ธฐ ์ํ task id, ์ด๋ฆ์ Uniqueํด์ผ ํจ
- python_calllable :์ค์ ํธ์ถ๋ python ํจ์ ์ด๋ฆ
- provide_context : python ํจ์ ํธ์ถ ์ ํด๋น ํจ์์์ ์ฌ์ฉ๋ ์ ์๋ ๊ธฐ๋ณธ์ ์ธ argument ๊ฐ์ ๋๊ฒจ์ค ์ง ์ฌ๋ถ
- op_kwargs : ๊ธฐ๋ณธ argument ์ธ์ ์ถ๊ฐ๋ก ๋๊ฒจ์ค parameter ์ ์
- dag : default dag ์ด๋ฆ, ๋ณดํต dag๋ฅผ ์จ์ค
dag ์์ฑ ์์
from airflow import DAG
from airflow.operators.dummy_operatorimport DummyOperator
from airflow.operators.python_operatorimport PythonOperator
def python_task1():
...
def python_task2():
...
dag= DAG(โ{dag_filename}', description='Simple tutorial DAG', schedule_interval='0 12 * * *', start_date=datetime(2019, 1, 20), catchup=False)
dummy_op1 = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
python_op1 = PythonOperator(task_id='python_task1', python_callable={function_name}, dag=dag)
python_op2 = PythonOperator(task_id='python_task2', python_callable={function_name}, dag=dag)
dummy_op1 >> [hello_op1, hello_op2]
3.2. Execution_date
3.3. Commands