๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
๐Ÿ’ป Programming/Apache Airflow

[Airflow] Airflow & DAG ์„ค๋ช…

by ๋ญ…์ฆค 2023. 11. 19.
๋ฐ˜์‘ํ˜•
Apache Airflow

 

Apache Airflow๋Š” ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์„ ๊ด€๋ฆฌํ•˜๊ณ  ์Šค์ผ€์ค„๋งํ•˜๊ธฐ ์œ„ํ•œ ์˜คํ”ˆ ์†Œ์Šค ํ”Œ๋žซํผ์ด๋‹ค. Airflow๋Š” ํŒŒ์ด์ฌ ์ฝ”๋“œ๋ฅผ ์ด์šฉํ•ด ํŒŒ์ดํ”„๋ผ์ธ์„ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๊ธฐ์— ํŒŒ์ด์ฌ ์–ธ์–ด๋กœ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋Š” ๋Œ€๋ถ€๋ถ„์˜ ๋ฐฉ๋ฒ•์„ ์‚ฌ์šฉํ•˜์—ฌ ์—ฌ๋Ÿฌ ์ปค์Šคํ…€ ํŒŒ์ดํ”„๋ผ์ธ์„ ๋งŒ๋“ค ์ˆ˜ ์žˆ๋‹ค. ๋˜ํ•œ ์‰ฝ๊ฒŒ ํ™•์žฅ ๊ฐ€๋Šฅํ•˜๊ณ  ๋‹ค์–‘ํ•œ ์‹œ์Šคํ…œ๊ณผ ํ†ตํ•ฉ์ด ๊ฐ€๋Šฅํ•˜๋‹ค. ์ˆ˜๋งŽ์€ ์Šค์ผ€์ค„๋ง ๊ธฐ๋ฒ•์œผ๋กœ ํŒŒ์ดํ”„๋ผ์ธ์„ ์ •๊ธฐ์ ์œผ๋กœ ์‹คํ–‰ํ•˜๊ณ  ์ ์ง„์  ์ฒ˜๋ฆฌ๊ฐ€ ๊ฐ€๋Šฅํ•˜๊ณ  ์˜คํ”ˆ ์†Œ์Šค๋ผ๋Š” ์žฅ์ ์ด ์žˆ๊ธฐ ๋•Œ๋ฌธ์— ๋งŽ์€ ๊ธฐ์—…์—์„œ Airflow๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ๋‹ค.

(๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์€ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์ž‘์—…์„ ์กฐ์งํ•˜๊ณ  ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•œ ์ผ๋ จ์˜ ๋‹จ๊ณ„ ๋ฐ ํ”„๋กœ์„ธ์Šค)

Apache Airflow์˜ ํŠน์ง•

  • ์Šค์ผ€์ค„๋ง๊ณผ ๋ชจ๋‹ˆํ„ฐ๋ง
    • Airflow๋Š” ์ž‘์—…์„ ์Šค์ผ€์ค„๋งํ•˜๊ณ  ๊ฐ์‹œํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋จ
    • ์˜ˆ์•ฝ๋œ ์ž‘์—…์€ DAG (Directed Acyclic Graph)๋ผ๋Š” ๋ฐฉํ–ฅ์„ฑ์ด ์žˆ๋Š” ๋น„์ˆœํ™˜ ๊ทธ๋ž˜ํ”„๋กœ ์ •์˜๋จ
  • ์œ ์—ฐํ•œ ์ž‘์—… ์ •์˜
    • Python์œผ๋กœ ์ž‘์„ฑ๋œ ์ฝ”๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ž‘์—…์„ ์ •์˜ํ•˜๋ฏ€๋กœ ๋งค์šฐ ์œ ์—ฐ
    • ์‚ฌ์šฉ์ž๋Š” ๋‹ค์–‘ํ•œ ์œ ํ˜•์˜ ์ž‘์—… ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์ž‘์—…์„ ์ •์˜ํ•  ์ˆ˜ ์žˆ์Œ
  • ๋ชจ๋“ˆ์„ฑ๊ณผ ์žฌ์‚ฌ์šฉ์„ฑ
    • Airflow์—์„œ๋Š” ์ž‘์—…์„ ์žฌ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ๋…๋ฆฝ์ ์ธ ์œ ๋‹›์œผ๋กœ ์ •์˜ํ•  ์ˆ˜ ์žˆ์Œ
    • ์ด๋Š” ์ฝ”๋“œ์˜ ๋ชจ๋“ˆ์„ฑ์„ ์ฆ๊ฐ€์‹œํ‚ค๊ณ  ์œ ์ง€ ๋ณด์ˆ˜๋ฅผ ์šฉ์ดํ•˜๊ฒŒ ํ•จ
  • ๋™์  ํ™•์žฅ์„ฑ
    • Airflow๋Š” ๋™์ ์œผ๋กœ ์ž‘์—…์„ ํ™•์žฅํ•  ์ˆ˜ ์žˆ๋Š” ๊ธฐ๋Šฅ์„ ์ œ๊ณต
    • ์ƒˆ๋กœ์šด ์ž‘์—…์ด๋‚˜ ๊ธฐ๋Šฅ์„ ์ถ”๊ฐ€ํ•˜๊ฑฐ๋‚˜ ์ˆ˜์ •ํ•  ๋•Œ ์‹œ์Šคํ…œ์„ ์ค‘์ง€์‹œํ‚ค์ง€ ์•Š๊ณ ๋„ ๋ณ€๊ฒฝ ์‚ฌํ•ญ์„ ๋ฐ˜์˜ํ•  ์ˆ˜ ์žˆ์Œ

 

DAG (Directed Acyclic Graph)

DAG๋Š” Directed Acyclic Graph์˜ ์•ฝ์ž๋กœ, ๋ฐฉํ–ฅ์„ฑ์ด ์žˆ๋Š” ๋น„์ˆœํ™˜ ๊ทธ๋ž˜ํ”„๋ฅผ ๋‚˜ํƒ€๋‚ธ๋‹ค. Airflow์—์„œ DAG๋Š” ์ž‘์—…์˜ ํ๋ฆ„์ด๋‚˜ ์˜์กด์„ฑ์„ ์ •์˜ํ•˜๋Š”๋ฐ ์‚ฌ์šฉ๋˜๋Š”๋ฐ, ์—ฌ๋Ÿฌ ์ž‘์—…๋“ค ๊ฐ„์˜ ์‹คํ–‰ ์ˆœ์„œ์™€ ์˜์กด์„ฑ์„ ํ‘œํ˜„ํ•˜๋Š” ๊ทธ๋ž˜ํ”„๋ผ๊ณ  ๋ณด๋ฉด ๋œ๋‹ค.

Airflow์—์„œ DAG๋Š” Python ์Šคํฌ๋ฆฝํŠธ๋กœ ์ •์˜๋˜๋ฉฐ, ์ด ์Šคํฌ๋ฆฝํŠธ๋Š” ์ž‘์—…๋“ค ๊ฐ„์˜ ์˜์กด์„ฑ ๋ฐ ์‹คํ–‰ ์Šค์ผ€์ค„์„ ๋ช…์‹œํ•œ๋‹ค. DAG ์ •์˜๋Š” ์ฃผ๋กœ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ตฌ์„ฑ ์š”์†Œ๋กœ ์ด๋ฃจ์–ด์ง„๋‹ค.

 

  • DAG ๊ฐ์ฒด : DAG ํด๋ž˜์Šค์˜ ์ธ์Šคํ„ด์Šค๋กœ, ์ž‘์—…์˜ ํ๋ฆ„๊ณผ ์Šค์ผ€์ค„์„ ์ •์˜
  • ์ž‘์—… (Task) : DAG ๋‚ด์—์„œ ์ˆ˜ํ–‰๋˜์–ด์•ผ ํ•˜๋Š” ๊ฐ๊ฐ์˜ ๋‹จ์œ„ ์ž‘์—…. ์ž‘์—…์€ PythonOperator, BashOperator, Python ํ•จ์ˆ˜ ๋“ฑ์œผ๋กœ ์ •์˜.
  • ์˜์กด์„ฑ (Dependencies) : ์ž‘์—… ๊ฐ„์˜ ์˜์กด์„ฑ. ์ฆ‰, ์–ด๋–ค ์ž‘์—…์€ ๋‹ค๋ฅธ ์ž‘์—…์ด ์„ฑ๊ณต์ ์œผ๋กœ ์™„๋ฃŒ๋œ ํ›„์—๋งŒ ์‹คํ–‰๋  ์ˆ˜ ์žˆ๋„๋ก ์„ค์ •ํ•  ์ˆ˜ ์žˆ์Œ.
  • ์Šค์ผ€์ค„ (Schedule) : DAG ๋‚ด์˜ ์ž‘์—…์ด ์‹คํ–‰๋˜๋Š” ์ฃผ๊ธฐ์ ์ธ ์Šค์ผ€์ค„์„ ์ •์˜. ์˜ˆ๋ฅผ ๋“ค์–ด, ๋งค์ผ, ๋งค์ฃผ ํŠน์ • ์š”์ผ ๋“ฑ์œผ๋กœ ์Šค์ผ€์ค„ํ•  ์ˆ˜ ์žˆ์Œ.

 

์ฝ”๋“œ ์˜ˆ์‹œ

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# DAG ์ •์˜
dag = DAG(
    'my_dag',
    description='My example DAG',
    schedule_interval=timedelta(days=1),  # ๋งค์ผ ์‹คํ–‰
    start_date=datetime(2023, 1, 1),
    catchup=False,  # ๊ณผ๊ฑฐ ์‹คํ–‰์—์„œ ๋ˆ„๋ฝ๋œ ์ž‘์—…์„ ์žฌ์‹คํ–‰ํ•˜์ง€ ์•Š์Œ
)

# ์ž‘์—… ์ •์˜
def task1():
    print("Task 1")

def task2():
    print("Task 2")

# DAG์— ์ž‘์—… ์ถ”๊ฐ€
t1 = PythonOperator(
    task_id='task1',
    python_callable=task1,
    dag=dag,
)

t2 = PythonOperator(
    task_id='task2',
    python_callable=task2,
    dag=dag,
)

# ์˜์กด์„ฑ ์ •์˜
t1 >> t2
  • ์œ„ ์˜ˆ์ œ์—์„œ t2๋Š” t1์ด ์„ฑ๊ณต์ ์œผ๋กœ ์™„๋ฃŒ๋œ ํ›„์—๋งŒ ์‹คํ–‰๋จ
  • 'schedule_interval'์€ DAG๊ฐ€ ์–ผ๋งˆ๋‚˜ ์ž์ฃผ ์‹คํ–‰๋ ์ง€๋ฅผ ์ •์˜
  • 'start_date'๋Š” DAG์˜ ์ตœ์ดˆ ์‹คํ–‰ ๋‚ ์งœ๋ฅผ ์ •์˜
๋ฐ˜์‘ํ˜•