Apache Airflow์์ DAG์ ์ผ์ ํ ๊ฐ๊ฒฉ์ผ๋ก ์คํํ๋ ค๋ฉด schedule_interval ๋งค๊ฐ๋ณ์๋ฅผ ์ฌ์ฉํ๋ค. ์ด ๋งค๊ฐ๋ณ์๋ DAG์ด ์คํ๋ ์ฃผ๊ธฐ๋ฅผ ๋ํ๋ด๊ณ , ์ฃผ๊ธฐ๋ timedelta ๊ฐ์ฒด๋ก ์ ์๋๋ค. ์๋ฅผ ๋ค์ด, ๋งค์ผ ์คํํ๋ ค๋ฉด timedelta(days=1)๊ณผ ๊ฐ์ด ์ ์ํ ์ ์๋ค.
timedelta๋ฅผ ์ฌ์ฉํ๋ ๋ช ๊ฐ์ง ์์ ๋ฅผ ์ดํด๋ณด์.
timedelta
# ๋งค์ผ ์คํ
schedule_interval=timedelta(days=1)
# 3์ผ๋ง๋ค ์คํ
schedule_interval=timedelta(days=3)
# ๋งค์ฃผ ์์์ผ ์คํ
schedule_interval=timedelta(weeks=1, days=1)
# ๋งค์๊ฐ ์คํ
schedule_interval=timedelta(hours=1)
# ๋งค๋ถ ์คํ
schedule_interval=timedelta(minutes=1)
์ค์ผ์ค ํ๋ฆฌ์
Apache Airflow์์๋ DAG์ schedule_interval์ ๋ฏธ๋ฆฌ ์ ์๋ ์ค์ผ์ค ํ๋ฆฌ์ ์ ์ฌ์ฉํ ์๋ ์๋ค. ์ด๋ฌํ ํ๋ฆฌ์ ์ ์ฃผ๋ก ๊ฐ๋จํ ํํ์ ์ฃผ๊ธฐ๋ฅผ ์ฝ๊ฒ ์ ์ํ ์ ์๋๋ก ๋์์ค๋ค.
# ํ ๋ฒ๋ง ์คํ
schedule_interval='@once'
# ๋งค ์๊ฐ ์คํ
schedule_interval='@hourly'
# ๋งค์ผ ์์ ์ ์คํ
schedule_interval='@daily'
# ๋งค์ฃผ ์ผ์์ผ ์์ ์ ์คํ
schedule_interval='@weekly'
# ๋งค์ 1์ผ ์์ ์ ์คํ
schedule_interval='@monthly'
# ๋งค๋
1์ 1์ผ ์์ ์ ์คํ
schedule_interval='@yearly'
Cron ๊ธฐ๋ฐ ์ค์ผ์ค ๊ฐ๊ฒฉ ์ค์
์์ ์ดํด๋ณธ ์์๋ณด๋ค ๋ ๋ํ ์ผํ ์ค์ผ์ค ๊ฐ๊ฒฉ์ ์ค์ ํ๊ณ ์ถ์ ๋๋ cron๊ณผ ๋์ผํ ๊ตฌ๋ฌธ์ ์ฌ์ฉํด ์ค์ผ์ค ๊ฐ๊ฒฉ์ ์ ์ํ ์ ์๋ค. ์๋ฅผ ๋ค์ด ๋งค์ฃผ ๊ธ์์ผ 10์ 30๋ถ์ DAG๋ฅผ ์คํํ๊ณ ์ถ์ ๊ฒฝ์ฐ์ ๋ง์ด๋ค.
cron ๊ตฌ๋ฌธ์ 5๊ฐ์ ๊ตฌ์ฑ ์์๊ฐ ์์ผ๋ฉฐ ๋ค์๊ณผ ๊ฐ์ด ์ ์ ๋๋ค.
- * * * * * (๋ถ(0~59), ์๊ฐ(0~23), ์ผ(1~31), ์(1~12), ์์ผ(0~6)(์ผ~ํ ))
- ์ ์ ์์์ cron job์ ์๊ฐ๊ณผ ๋ ์ง๊ฐ ํด๋น ํ๋์ ๊ฐ๊ณผ ์์คํ ์๊ฐ์ด ์ผ์นํ ๋ ์คํ๋จ
- ์ซ์ ๋์ ์ ์คํฐ๋ฆฌ์คํฌ(*)๋ก ์ ํ๋์ง ์์ ํ๋๋ก ์ ์ํ์ฌ ์ ๊ฒฝ์ฐ์ง ์๋๋ค๊ณ ํ์ํ ์ ์์
- ์์
- 0 * * * * = ๋งค์๊ฐ (์ ์์ ์คํ)
- 0 0 * * * = ๋งค์ผ (์์ ์ ์คํ)
- 0 0 * * 0 = ๋งค์ฃผ (์ผ์์ผ ์์ ์ ์คํ)
- 30 22 * * SAT = ๋งค์ฃผ ํ ์์ผ 22์ 30๋ถ์ ์คํ
๋ค์ ๋ณต์กํ๊ณ ํท๊ฐ๋ฆฌ๊ธด ํ์ง๋ง cron ๊ตฌ๋ฌธ์ ์ฌ์ฉํ๋ฉด ๋ํ ์ผํ ์๊ฐ ๊ฐ๊ฒฉ์ ์ ์ํ ๋ ๊ต์ฅํ ์ ์ฉํ๋ค.
DAG ์คํ ์์
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
# DAG ์ ์
dag = DAG(
'my_daily_dag',
description='My daily DAG example',
schedule_interval=timedelta(days=1), # ๋งค์ผ ์คํ
start_date=datetime(2023, 1, 1),
catchup=False, # ๊ณผ๊ฑฐ ์คํ์์ ๋๋ฝ๋ ์์
์ ์ฌ์คํํ์ง ์์
)
# ์์
์ ์
def my_python_function():
print("Executing my Python function")
# DAG์ ์์
์ถ๊ฐ
my_task = PythonOperator(
task_id='my_task',
python_callable=my_python_function,
dag=dag,
)
- ์ด DAG์ schedule_interval=timedelta(days=1)๋ก ์ค์ ๋์ด ์์ผ๋ฏ๋ก ๋งค์ผ ์์ ์ ์คํ๋จ
- start_date๋ DAG์ ์ต์ด ์คํ ๋ ์ง๋ฅผ ์ ์
- catchup=False๋ ๊ณผ๊ฑฐ์ ์คํ์์ ๋๋ฝ๋ ์์ ์ ์ฌ์คํํ์ง ์๋๋ก ์ค์