Apache Airflow
Apache Airflow๋ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ๊ด๋ฆฌํ๊ณ ์ค์ผ์ค๋งํ๊ธฐ ์ํ ์คํ ์์ค ํ๋ซํผ์ด๋ค. Airflow๋ ํ์ด์ฌ ์ฝ๋๋ฅผ ์ด์ฉํด ํ์ดํ๋ผ์ธ์ ๊ตฌํํ ์ ์๊ธฐ์ ํ์ด์ฌ ์ธ์ด๋ก ๊ตฌํํ ์ ์๋ ๋๋ถ๋ถ์ ๋ฐฉ๋ฒ์ ์ฌ์ฉํ์ฌ ์ฌ๋ฌ ์ปค์คํ ํ์ดํ๋ผ์ธ์ ๋ง๋ค ์ ์๋ค. ๋ํ ์ฝ๊ฒ ํ์ฅ ๊ฐ๋ฅํ๊ณ ๋ค์ํ ์์คํ ๊ณผ ํตํฉ์ด ๊ฐ๋ฅํ๋ค. ์๋ง์ ์ค์ผ์ค๋ง ๊ธฐ๋ฒ์ผ๋ก ํ์ดํ๋ผ์ธ์ ์ ๊ธฐ์ ์ผ๋ก ์คํํ๊ณ ์ ์ง์ ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํ๊ณ ์คํ ์์ค๋ผ๋ ์ฅ์ ์ด ์๊ธฐ ๋๋ฌธ์ ๋ง์ ๊ธฐ์ ์์ Airflow๋ฅผ ์ฌ์ฉํ๊ณ ์๋ค.
(๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์์
์ ์กฐ์งํ๊ณ ์คํํ๊ธฐ ์ํ ์ผ๋ จ์ ๋จ๊ณ ๋ฐ ํ๋ก์ธ์ค)
Apache Airflow์ ํน์ง
- ์ค์ผ์ค๋ง๊ณผ ๋ชจ๋ํฐ๋ง
- Airflow๋ ์์ ์ ์ค์ผ์ค๋งํ๊ณ ๊ฐ์ํ๋ ๋ฐ ์ฌ์ฉ๋จ
- ์์ฝ๋ ์์ ์ DAG (Directed Acyclic Graph)๋ผ๋ ๋ฐฉํฅ์ฑ์ด ์๋ ๋น์ํ ๊ทธ๋ํ๋ก ์ ์๋จ
- ์ ์ฐํ ์์
์ ์
- Python์ผ๋ก ์์ฑ๋ ์ฝ๋๋ฅผ ์ฌ์ฉํ์ฌ ์์ ์ ์ ์ํ๋ฏ๋ก ๋งค์ฐ ์ ์ฐ
- ์ฌ์ฉ์๋ ๋ค์ํ ์ ํ์ ์์ ๋ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์์ ์ ์ ์ํ ์ ์์
- ๋ชจ๋์ฑ๊ณผ ์ฌ์ฌ์ฉ์ฑ
- Airflow์์๋ ์์ ์ ์ฌ์ฌ์ฉ ๊ฐ๋ฅํ ๋ ๋ฆฝ์ ์ธ ์ ๋์ผ๋ก ์ ์ํ ์ ์์
- ์ด๋ ์ฝ๋์ ๋ชจ๋์ฑ์ ์ฆ๊ฐ์ํค๊ณ ์ ์ง ๋ณด์๋ฅผ ์ฉ์ดํ๊ฒ ํจ
- ๋์ ํ์ฅ์ฑ
- Airflow๋ ๋์ ์ผ๋ก ์์ ์ ํ์ฅํ ์ ์๋ ๊ธฐ๋ฅ์ ์ ๊ณต
- ์๋ก์ด ์์ ์ด๋ ๊ธฐ๋ฅ์ ์ถ๊ฐํ๊ฑฐ๋ ์์ ํ ๋ ์์คํ ์ ์ค์ง์ํค์ง ์๊ณ ๋ ๋ณ๊ฒฝ ์ฌํญ์ ๋ฐ์ํ ์ ์์
DAG (Directed Acyclic Graph)
DAG๋ Directed Acyclic Graph์ ์ฝ์๋ก, ๋ฐฉํฅ์ฑ์ด ์๋ ๋น์ํ ๊ทธ๋ํ๋ฅผ ๋ํ๋ธ๋ค. Airflow์์ DAG๋ ์์
์ ํ๋ฆ์ด๋ ์์กด์ฑ์ ์ ์ํ๋๋ฐ ์ฌ์ฉ๋๋๋ฐ, ์ฌ๋ฌ ์์
๋ค ๊ฐ์ ์คํ ์์์ ์์กด์ฑ์ ํํํ๋ ๊ทธ๋ํ๋ผ๊ณ ๋ณด๋ฉด ๋๋ค.
Airflow์์ DAG๋ Python ์คํฌ๋ฆฝํธ๋ก ์ ์๋๋ฉฐ, ์ด ์คํฌ๋ฆฝํธ๋ ์์
๋ค ๊ฐ์ ์์กด์ฑ ๋ฐ ์คํ ์ค์ผ์ค์ ๋ช
์ํ๋ค. DAG ์ ์๋ ์ฃผ๋ก ๋ค์๊ณผ ๊ฐ์ ๊ตฌ์ฑ ์์๋ก ์ด๋ฃจ์ด์ง๋ค.
- DAG ๊ฐ์ฒด : DAG ํด๋์ค์ ์ธ์คํด์ค๋ก, ์์ ์ ํ๋ฆ๊ณผ ์ค์ผ์ค์ ์ ์
- ์์ (Task) : DAG ๋ด์์ ์ํ๋์ด์ผ ํ๋ ๊ฐ๊ฐ์ ๋จ์ ์์ . ์์ ์ PythonOperator, BashOperator, Python ํจ์ ๋ฑ์ผ๋ก ์ ์.
- ์์กด์ฑ (Dependencies) : ์์ ๊ฐ์ ์์กด์ฑ. ์ฆ, ์ด๋ค ์์ ์ ๋ค๋ฅธ ์์ ์ด ์ฑ๊ณต์ ์ผ๋ก ์๋ฃ๋ ํ์๋ง ์คํ๋ ์ ์๋๋ก ์ค์ ํ ์ ์์.
- ์ค์ผ์ค (Schedule) : DAG ๋ด์ ์์ ์ด ์คํ๋๋ ์ฃผ๊ธฐ์ ์ธ ์ค์ผ์ค์ ์ ์. ์๋ฅผ ๋ค์ด, ๋งค์ผ, ๋งค์ฃผ ํน์ ์์ผ ๋ฑ์ผ๋ก ์ค์ผ์คํ ์ ์์.
์ฝ๋ ์์
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
# DAG ์ ์
dag = DAG(
'my_dag',
description='My example DAG',
schedule_interval=timedelta(days=1), # ๋งค์ผ ์คํ
start_date=datetime(2023, 1, 1),
catchup=False, # ๊ณผ๊ฑฐ ์คํ์์ ๋๋ฝ๋ ์์
์ ์ฌ์คํํ์ง ์์
)
# ์์
์ ์
def task1():
print("Task 1")
def task2():
print("Task 2")
# DAG์ ์์
์ถ๊ฐ
t1 = PythonOperator(
task_id='task1',
python_callable=task1,
dag=dag,
)
t2 = PythonOperator(
task_id='task2',
python_callable=task2,
dag=dag,
)
# ์์กด์ฑ ์ ์
t1 >> t2
- ์ ์์ ์์ t2๋ t1์ด ์ฑ๊ณต์ ์ผ๋ก ์๋ฃ๋ ํ์๋ง ์คํ๋จ
- 'schedule_interval'์ DAG๊ฐ ์ผ๋ง๋ ์์ฃผ ์คํ๋ ์ง๋ฅผ ์ ์
- 'start_date'๋ DAG์ ์ต์ด ์คํ ๋ ์ง๋ฅผ ์ ์