๋ฐ์ํ
PythonOperator๋ Apache Airflow์์ Python ํจ์๋ฅผ ์คํํ๋ ์์
์ ์ ์ํ๋ ๋ฐ ์ฌ์ฉ๋๋ ์ฐ์ฐ์์ด๋ค. ์ด๋ฅผ ํตํด Python ํจ์๋ฅผ ํธ์ถํ์ฌ ๋ฐ์ดํฐ ์ฒ๋ฆฌ, ๊ณ์ฐ, ๋๋ ์ฌ์ฉ์ ์ง์ ์์
์ ์ํํ ์ ์๋ค. ์๋๋ PythonOperator๋ฅผ ์ฌ์ฉํ๋ ๊ฐ๋จํ ์์ ์ด๋ค.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
# DAG ์ ์
dag = DAG(
'python_operator_example',
description='Example DAG with PythonOperator',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
)
# Python ํจ์ ์ ์
def my_python_function():
print("Hello, Airflow!")
print("Current date:", datetime.now())
# PythonOperator ์ ์
python_task = PythonOperator(
task_id='python_task',
python_callable=my_python_function,
dag=dag,
)
# DAG ๊ฐ์ ์์กด์ฑ ์ค์
# ๋ค๋ฅธ ์์
๋ค๊ณผ์ ์์กด์ฑ์ด ์๋ค๋ฉด ์ฌ๊ธฐ์ ์ถ๊ฐ
# DAG ์คํ
if __name__ == "__main__":
dag.cli()
- ์ด ์์ ์์ PythonOperator๋ฅผ ์์ฑํ ๋ task_id๋ก ์์ ์ ๊ณ ์ ํ ์๋ณ์๋ฅผ ์ค์ ํ๊ณ , python_callable ๋งค๊ฐ๋ณ์์๋ ์คํํ Python ํจ์๋ฅผ ์ ๋ฌ
- ์ด ํจ์๋ Airflow ์คํ ํ๊ฒฝ์์ ์คํ๋๋ฉฐ, ํจ์์ ์ถ๋ ฅ ๋ฐ ์ค๋ฅ๋ ์๋์ผ๋ก Airflow ๋ก๊ทธ์ ๊ธฐ๋ก๋จ
- DAG ๋ด์์ ์์ ๊ฐ์ ์์กด์ฑ์ ์ค์ ํ๋ ค๋ฉด ํ์ํ ์์ ์ task_id๋ฅผ ์ฌ์ฉํ์ฌ ์์กด์ฑ ์ฐ์ฐ์๋ฅผ ์ ์ํ๋ฉด ๋จ
- ์ด๋ set_upstream, set_downstream, ๋๋ >>, << ์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ์ฌ ์ํํ ์ ์์
๋ฐ์ํ