๋ฐ์ํ
BashOperator๋ Apache Airflow์์ ์ ์คํฌ๋ฆฝํธ๋ ๋ช
๋ น์ด๋ฅผ ์คํํ๋ ์์
์ ์ ์ํ๋ ๋ฐ ์ฌ์ฉ๋๋ ์ฐ์ฐ์์ด๋ค. ์ด๋ฅผ ํตํด ์ธ๋ถ ํ๋ก๊ทธ๋จ, ์คํฌ๋ฆฝํธ ๋๋ ๋ช
๋ น์ด๋ฅผ ์คํํ๊ณ ๊ฒฐ๊ณผ๋ฅผ ํ์ธํ ์ ์๋ค.
๋ค์์ BashOperator๋ฅผ ์ฌ์ฉํ๋ ๊ฐ๋จํ ์์ ๋ก, ๊ฐ๋จํ Bash ์คํฌ๋ฆฝํธ๋ฅผ ์คํํ๊ณ ์ถ๋ ฅ์ ๋ก๊น
ํ๋ค.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
# DAG ์ ์
dag = DAG(
'bash_operator_example',
description='Example DAG with BashOperator',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
)
# Bash ์คํฌ๋ฆฝํธ ์ ์
bash_script = """
echo "Hello, Airflow!"
echo "Current date: $(date)"
"""
# BashOperator ์ ์
bash_task = BashOperator(
task_id='bash_task',
bash_command=bash_script,
dag=dag,
)
# DAG ๊ฐ์ ์์กด์ฑ ์ค์
# ๋ค๋ฅธ ์์
๋ค๊ณผ์ ์์กด์ฑ์ด ์๋ค๋ฉด ์ฌ๊ธฐ์ ์ถ๊ฐ
# DAG ์คํ
if __name__ == "__main__":
dag.cli()
- ์ด ์์ ์์ BashOperator๋ฅผ ์์ฑํ ๋ task_id๋ก ์์ ์ ๊ณ ์ ํ ์๋ณ์๋ฅผ ์ค์ ํ๊ณ , bash_command ๋งค๊ฐ๋ณ์์๋ ์คํํ Bash ์คํฌ๋ฆฝํธ๋ฅผ ์ ๋ฌ.
- ์คํฌ๋ฆฝํธ์ ์ถ๋ ฅ์ ์๋์ผ๋ก Airflow ๋ก๊ทธ์ ๊ธฐ๋ก๋จ
- DAG ๋ด์์ ์์ ๊ฐ์ ์์กด์ฑ์ ์ค์ ํ๋ ค๋ฉด ํ์ํ ์์ ์ task_id๋ฅผ ์ฌ์ฉํ์ฌ ์์กด์ฑ ์ฐ์ฐ์๋ฅผ ์ ์
- ์ด๋ set_upstream, set_downstream, ๋๋ >>, << ์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ์ฌ ์ํํ ์ ์์
# ๋ค๋ฅธ ์์
์ ์
other_task = ...
# BashOperator์ ๋ค๋ฅธ ์์
๊ฐ์ ์์กด์ฑ ์ค์
other_task >> bash_task # ๋ค๋ฅธ ์์
์ด Bash ์์
๋ณด๋ค ๋จผ์ ์คํ๋๋๋ก ์ค์
๋ฐ์ํ