๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
๐Ÿ’ป Programming/Apache Airflow

[Airflow] ์ผ์ •ํ•œ ๊ฐ„๊ฒฉ์œผ๋กœ DAG ์‹คํ–‰ํ•˜๊ธฐ (์Šค์ผ€์ค„๋ง) | schedule_interval | cron ๊ธฐ๋ฐ˜ ์Šค์ผ€์ค„

by ๋ญ…์ฆค 2023. 11. 19.
๋ฐ˜์‘ํ˜•


Apache Airflow์—์„œ DAG์„ ์ผ์ •ํ•œ ๊ฐ„๊ฒฉ์œผ๋กœ ์‹คํ–‰ํ•˜๋ ค๋ฉด schedule_interval ๋งค๊ฐœ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค. ์ด ๋งค๊ฐœ๋ณ€์ˆ˜๋Š” DAG์ด ์‹คํ–‰๋  ์ฃผ๊ธฐ๋ฅผ ๋‚˜ํƒ€๋‚ด๊ณ , ์ฃผ๊ธฐ๋Š” timedelta ๊ฐ์ฒด๋กœ ์ •์˜๋œ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ๋งค์ผ ์‹คํ–‰ํ•˜๋ ค๋ฉด timedelta(days=1)๊ณผ ๊ฐ™์ด ์ •์˜ํ•  ์ˆ˜ ์žˆ๋‹ค.

 

timedelta๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๋ช‡ ๊ฐ€์ง€ ์˜ˆ์ œ๋ฅผ ์‚ดํŽด๋ณด์ž.

 

timedelta

# ๋งค์ผ ์‹คํ–‰
schedule_interval=timedelta(days=1)

# 3์ผ๋งˆ๋‹ค ์‹คํ–‰
schedule_interval=timedelta(days=3)

# ๋งค์ฃผ ์›”์š”์ผ ์‹คํ–‰
schedule_interval=timedelta(weeks=1, days=1) 

# ๋งค์‹œ๊ฐ„ ์‹คํ–‰
schedule_interval=timedelta(hours=1)

# ๋งค๋ถ„ ์‹คํ–‰
schedule_interval=timedelta(minutes=1)

 

 

 

์Šค์ผ€์ค„ ํ”„๋ฆฌ์…‹

Apache Airflow์—์„œ๋Š” DAG์˜ schedule_interval์— ๋ฏธ๋ฆฌ ์ •์˜๋œ ์Šค์ผ€์ค„ ํ”„๋ฆฌ์…‹์„ ์‚ฌ์šฉํ•  ์ˆ˜๋„ ์žˆ๋‹ค. ์ด๋Ÿฌํ•œ ํ”„๋ฆฌ์…‹์€ ์ฃผ๋กœ ๊ฐ„๋‹จํ•œ ํ˜•ํƒœ์˜ ์ฃผ๊ธฐ๋ฅผ ์‰ฝ๊ฒŒ ์ •์˜ํ•  ์ˆ˜ ์žˆ๋„๋ก ๋„์™€์ค€๋‹ค. 

# ํ•œ ๋ฒˆ๋งŒ ์‹คํ–‰
schedule_interval='@once'

# ๋งค ์‹œ๊ฐ„ ์‹คํ–‰
schedule_interval='@hourly'

# ๋งค์ผ ์ž์ •์— ์‹คํ–‰
schedule_interval='@daily'

# ๋งค์ฃผ ์ผ์š”์ผ ์ž์ •์— ์‹คํ–‰
schedule_interval='@weekly'

# ๋งค์›” 1์ผ ์ž์ •์— ์‹คํ–‰
schedule_interval='@monthly'

# ๋งค๋…„ 1์›” 1์ผ ์ž์ •์— ์‹คํ–‰
schedule_interval='@yearly'

 

 

Cron ๊ธฐ๋ฐ˜ ์Šค์ผ€์ค„ ๊ฐ„๊ฒฉ ์„ค์ •

์•ž์„œ ์‚ดํŽด๋ณธ ์˜ˆ์‹œ๋ณด๋‹ค ๋” ๋””ํ…Œ์ผํ•œ ์Šค์ผ€์ค„ ๊ฐ„๊ฒฉ์„ ์„ค์ •ํ•˜๊ณ  ์‹ถ์„ ๋•Œ๋Š” cron๊ณผ ๋™์ผํ•œ ๊ตฌ๋ฌธ์„ ์‚ฌ์šฉํ•ด ์Šค์ผ€์ค„ ๊ฐ„๊ฒฉ์„ ์ •์˜ํ•  ์ˆ˜ ์žˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ๋งค์ฃผ ๊ธˆ์š”์ผ 10์‹œ 30๋ถ„์— DAG๋ฅผ ์‹คํ–‰ํ•˜๊ณ  ์‹ถ์€ ๊ฒฝ์šฐ์— ๋ง์ด๋‹ค.

 

cron ๊ตฌ๋ฌธ์€ 5๊ฐœ์˜ ๊ตฌ์„ฑ ์š”์†Œ๊ฐ€ ์žˆ์œผ๋ฉฐ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ •์˜ ๋œ๋‹ค.

  • * * * * * (๋ถ„(0~59), ์‹œ๊ฐ„(0~23), ์ผ(1~31), ์›”(1~12), ์š”์ผ(0~6)(์ผ~ํ† ))
  • ์œ„ ์ •์˜์—์„œ cron job์€ ์‹œ๊ฐ„๊ณผ ๋‚ ์งœ๊ฐ€ ํ•ด๋‹น ํ•„๋“œ์˜ ๊ฐ’๊ณผ ์‹œ์Šคํ…œ ์‹œ๊ฐ„์ด ์ผ์น˜ํ•  ๋•Œ ์‹คํ–‰๋จ
  • ์ˆซ์ž ๋Œ€์‹  ์• ์Šคํ„ฐ๋ฆฌ์Šคํฌ(*)๋กœ ์ œํ•œ๋˜์ง€ ์•Š์€ ํ•„๋“œ๋กœ ์ •์˜ํ•˜์—ฌ ์‹ ๊ฒฝ์“ฐ์ง€ ์•Š๋Š”๋‹ค๊ณ  ํ‘œ์‹œํ•  ์ˆ˜ ์žˆ์Œ
  • ์˜ˆ์‹œ
    • 0 * * * * = ๋งค์‹œ๊ฐ„ (์ •์‹œ์— ์‹คํ–‰)
    • 0 0 * * * = ๋งค์ผ (์ž์ •์— ์‹คํ–‰)
    • 0 0  * * 0 = ๋งค์ฃผ (์ผ์š”์ผ ์ž์ •์— ์‹คํ–‰)
    • 30 22 * * SAT = ๋งค์ฃผ ํ† ์š”์ผ 22์‹œ 30๋ถ„์— ์‹คํ–‰ 

 

๋‹ค์†Œ ๋ณต์žกํ•˜๊ณ  ํ—ท๊ฐˆ๋ฆฌ๊ธด ํ•˜์ง€๋งŒ cron ๊ตฌ๋ฌธ์„ ์‚ฌ์šฉํ•˜๋ฉด ๋””ํ…Œ์ผํ•œ ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์„ ์ •์˜ํ•  ๋•Œ ๊ต‰์žฅํžˆ ์œ ์šฉํ•˜๋‹ค.



DAG ์‹คํ–‰ ์˜ˆ์ œ 

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# DAG ์ •์˜
dag = DAG(
    'my_daily_dag',
    description='My daily DAG example',
    schedule_interval=timedelta(days=1),  # ๋งค์ผ ์‹คํ–‰
    start_date=datetime(2023, 1, 1),
    catchup=False,  # ๊ณผ๊ฑฐ ์‹คํ–‰์—์„œ ๋ˆ„๋ฝ๋œ ์ž‘์—…์„ ์žฌ์‹คํ–‰ํ•˜์ง€ ์•Š์Œ
)

# ์ž‘์—… ์ •์˜
def my_python_function():
    print("Executing my Python function")

# DAG์— ์ž‘์—… ์ถ”๊ฐ€
my_task = PythonOperator(
    task_id='my_task',
    python_callable=my_python_function,
    dag=dag,
)
  • ์ด DAG์€ schedule_interval=timedelta(days=1)๋กœ ์„ค์ •๋˜์–ด ์žˆ์œผ๋ฏ€๋กœ ๋งค์ผ ์ž์ •์— ์‹คํ–‰๋จ
  • start_date๋Š” DAG์˜ ์ตœ์ดˆ ์‹คํ–‰ ๋‚ ์งœ๋ฅผ ์ •์˜
  • catchup=False๋Š” ๊ณผ๊ฑฐ์˜ ์‹คํ–‰์—์„œ ๋ˆ„๋ฝ๋œ ์ž‘์—…์„ ์žฌ์‹คํ–‰ํ•˜์ง€ ์•Š๋„๋ก ์„ค์ •
๋ฐ˜์‘ํ˜•