๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
๐Ÿ’ป Programming/Apache Airflow

[Airflow] Python ํ•จ์ˆ˜ ์‹คํ–‰ํ•˜๊ธฐ | PythonOperator ์‚ฌ์šฉ

by ๋ญ…์ฆค 2023. 11. 19.
๋ฐ˜์‘ํ˜•


PythonOperator๋Š” Apache Airflow์—์„œ Python ํ•จ์ˆ˜๋ฅผ ์‹คํ–‰ํ•˜๋Š” ์ž‘์—…์„ ์ •์˜ํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋˜๋Š” ์—ฐ์‚ฐ์ž์ด๋‹ค. ์ด๋ฅผ ํ†ตํ•ด Python ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ, ๊ณ„์‚ฐ, ๋˜๋Š” ์‚ฌ์šฉ์ž ์ง€์ • ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค. ์•„๋ž˜๋Š” PythonOperator๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฐ„๋‹จํ•œ ์˜ˆ์ œ์ด๋‹ค.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# DAG ์ •์˜
dag = DAG(
    'python_operator_example',
    description='Example DAG with PythonOperator',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 1, 1),
    catchup=False,
)

# Python ํ•จ์ˆ˜ ์ •์˜
def my_python_function():
    print("Hello, Airflow!")
    print("Current date:", datetime.now())

# PythonOperator ์ •์˜
python_task = PythonOperator(
    task_id='python_task',
    python_callable=my_python_function,
    dag=dag,
)

# DAG ๊ฐ„์˜ ์˜์กด์„ฑ ์„ค์ •
# ๋‹ค๋ฅธ ์ž‘์—…๋“ค๊ณผ์˜ ์˜์กด์„ฑ์ด ์žˆ๋‹ค๋ฉด ์—ฌ๊ธฐ์— ์ถ”๊ฐ€

# DAG ์‹คํ–‰
if __name__ == "__main__":
    dag.cli()
  • ์ด ์˜ˆ์ œ์—์„œ PythonOperator๋ฅผ ์ƒ์„ฑํ•  ๋•Œ task_id๋กœ ์ž‘์—…์˜ ๊ณ ์œ ํ•œ ์‹๋ณ„์ž๋ฅผ ์„ค์ •ํ•˜๊ณ , python_callable ๋งค๊ฐœ๋ณ€์ˆ˜์—๋Š” ์‹คํ–‰ํ•  Python ํ•จ์ˆ˜๋ฅผ ์ „๋‹ฌ
  • ์ด ํ•จ์ˆ˜๋Š” Airflow ์‹คํ–‰ ํ™˜๊ฒฝ์—์„œ ์‹คํ–‰๋˜๋ฉฐ, ํ•จ์ˆ˜์˜ ์ถœ๋ ฅ ๋ฐ ์˜ค๋ฅ˜๋Š” ์ž๋™์œผ๋กœ Airflow ๋กœ๊ทธ์— ๊ธฐ๋ก๋จ
  • DAG ๋‚ด์—์„œ ์ž‘์—… ๊ฐ„์˜ ์˜์กด์„ฑ์„ ์„ค์ •ํ•˜๋ ค๋ฉด ํ•„์š”ํ•œ ์ž‘์—…์˜ task_id๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์˜์กด์„ฑ ์—ฐ์‚ฐ์ž๋ฅผ ์ •์˜ํ•˜๋ฉด ๋จ
  • ์ด๋Š” set_upstream, set_downstream, ๋˜๋Š” >>, << ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ์Œ
๋ฐ˜์‘ํ˜•