๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
๐Ÿ’ป Programming/Apache Airflow

[Airflow] ์‰˜ ์Šคํฌ๋ฆฝํŠธ, ๋ช…๋ น์–ด ์‹คํ–‰ํ•˜๊ธฐ | BashOperator ์‚ฌ์šฉ

by ๋ญ…์ฆค 2023. 11. 19.
๋ฐ˜์‘ํ˜•

BashOperator๋Š” Apache Airflow์—์„œ ์‰˜ ์Šคํฌ๋ฆฝํŠธ๋‚˜ ๋ช…๋ น์–ด๋ฅผ ์‹คํ–‰ํ•˜๋Š” ์ž‘์—…์„ ์ •์˜ํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋˜๋Š” ์—ฐ์‚ฐ์ž์ด๋‹ค. ์ด๋ฅผ ํ†ตํ•ด ์™ธ๋ถ€ ํ”„๋กœ๊ทธ๋žจ, ์Šคํฌ๋ฆฝํŠธ ๋˜๋Š” ๋ช…๋ น์–ด๋ฅผ ์‹คํ–‰ํ•˜๊ณ  ๊ฒฐ๊ณผ๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

๋‹ค์Œ์€ BashOperator๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฐ„๋‹จํ•œ ์˜ˆ์ œ๋กœ, ๊ฐ„๋‹จํ•œ Bash ์Šคํฌ๋ฆฝํŠธ๋ฅผ ์‹คํ–‰ํ•˜๊ณ  ์ถœ๋ ฅ์„ ๋กœ๊น…ํ•œ๋‹ค.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

# DAG ์ •์˜
dag = DAG(
    'bash_operator_example',
    description='Example DAG with BashOperator',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 1, 1),
    catchup=False,
)

# Bash ์Šคํฌ๋ฆฝํŠธ ์ •์˜
bash_script = """
echo "Hello, Airflow!"
echo "Current date: $(date)"
"""

# BashOperator ์ •์˜
bash_task = BashOperator(
    task_id='bash_task',
    bash_command=bash_script,
    dag=dag,
)

# DAG ๊ฐ„์˜ ์˜์กด์„ฑ ์„ค์ •
# ๋‹ค๋ฅธ ์ž‘์—…๋“ค๊ณผ์˜ ์˜์กด์„ฑ์ด ์žˆ๋‹ค๋ฉด ์—ฌ๊ธฐ์— ์ถ”๊ฐ€

# DAG ์‹คํ–‰
if __name__ == "__main__":
    dag.cli()
  • ์ด ์˜ˆ์ œ์—์„œ BashOperator๋ฅผ ์ƒ์„ฑํ•  ๋•Œ task_id๋กœ ์ž‘์—…์˜ ๊ณ ์œ ํ•œ ์‹๋ณ„์ž๋ฅผ ์„ค์ •ํ•˜๊ณ , bash_command ๋งค๊ฐœ๋ณ€์ˆ˜์—๋Š” ์‹คํ–‰ํ•  Bash ์Šคํฌ๋ฆฝํŠธ๋ฅผ ์ „๋‹ฌ.
  • ์Šคํฌ๋ฆฝํŠธ์˜ ์ถœ๋ ฅ์€ ์ž๋™์œผ๋กœ Airflow ๋กœ๊ทธ์— ๊ธฐ๋ก๋จ
  • DAG ๋‚ด์—์„œ ์ž‘์—… ๊ฐ„์˜ ์˜์กด์„ฑ์„ ์„ค์ •ํ•˜๋ ค๋ฉด ํ•„์š”ํ•œ ์ž‘์—…์˜ task_id๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์˜์กด์„ฑ ์—ฐ์‚ฐ์ž๋ฅผ ์ •์˜
  • ์ด๋Š” set_upstream, set_downstream, ๋˜๋Š” >>, << ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ์Œ
# ๋‹ค๋ฅธ ์ž‘์—… ์ •์˜
other_task = ...

# BashOperator์™€ ๋‹ค๋ฅธ ์ž‘์—… ๊ฐ„์˜ ์˜์กด์„ฑ ์„ค์ •
other_task >> bash_task  # ๋‹ค๋ฅธ ์ž‘์—…์ด Bash ์ž‘์—…๋ณด๋‹ค ๋จผ์ € ์‹คํ–‰๋˜๋„๋ก ์„ค์ •
๋ฐ˜์‘ํ˜•