๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
๐Ÿ’ป Programming/Apache Airflow

[Airflow] DB ์ฟผ๋ฆฌ์˜ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ์‹œํ•˜๊ณ , ํŠน์ • ์กฐ๊ฑด์ด ์ถฉ์กฑ๋  ๋•Œ๊นŒ์ง€ ์ž‘์—…์„ ์ผ์‹œ ์ค‘์ง€ํ•˜๋Š” ๊ธฐ๋Šฅ | SqlSensor

by ๋ญ…์ฆค 2023. 11. 20.
๋ฐ˜์‘ํ˜•

Apache Airflow์˜ SqlSensor๋Š” ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ฟผ๋ฆฌ์˜ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ์‹œํ•˜๊ณ , ํŠน์ • ์กฐ๊ฑด์ด ์ถฉ์กฑ๋  ๋•Œ๊นŒ์ง€ ์ž‘์—…์„ ์ผ์‹œ ์ค‘์ง€ํ•˜๋Š” ์—ญํ• ์„ ํ•ฉ๋‹ˆ๋‹ค. ์ด ์„ผ์„œ๋Š” ์ฃผ๋กœ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ฟผ๋ฆฌ์˜ ๊ฒฐ๊ณผ๋ฅผ ํ™•์ธํ•˜์—ฌ ํŠน์ • ๊ฐ’์ด๋‚˜ ์กฐ๊ฑด์ด ์ถฉ์กฑ๋˜์—ˆ๋Š”์ง€๋ฅผ ํ™•์ธํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค.

 

SqlSensor ์‚ฌ์šฉ ๋ฐฉ๋ฒ•

from airflow.sensors.sql import SqlSensor

sql_sensor_task = SqlSensor(
    task_id='sql_sensor_task',
    conn_id='your_database_connection_id',  # ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ ID
    sql='SELECT COUNT(*) FROM your_table WHERE your_condition;',  # ๊ฐ์‹œํ•  ์ฟผ๋ฆฌ
    mode='poke',  # 'poke' ๋˜๋Š” 'reschedule' ์ค‘ ์„ ํƒ
    timeout=600,  # ํƒ€์ž„์•„์›ƒ (์ดˆ ๋‹จ์œ„)
    poke_interval=60,  # 'poke' ๋ชจ๋“œ์—์„œ ์ฟผ๋ฆฌ ๊ฐ์‹œ ๊ฐ„๊ฒฉ (์ดˆ ๋‹จ์œ„)
    dag=dag  # Airflow DAG ๊ฐ์ฒด
)


# DAG์— ์ถ”๊ฐ€ 
sql_sensor_task >> your_next_task
  • conn_id: ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ์„ ์‹๋ณ„ํ•˜๋Š” ๋ฌธ์ž์—ด. ์ด ์—ฐ๊ฒฐ์€ Airflow์˜ Connection์—์„œ ์„ค์ •ํ•ด์•ผ ํ•œ๋‹ค
  • sql: ๊ฐ์‹œํ•  ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ฟผ๋ฆฌ
  • mode: ์„ผ์„œ ๋™์ž‘ ๋ชจ๋“œ. 'poke'๋Š” ์ง€์†์ ์œผ๋กœ ๊ฐ์‹œํ•˜๊ณ , 'reschedule'์€ ์ผ์ • ๊ฐ„๊ฒฉ์œผ๋กœ ์ƒˆ๋กœ์šด ์ž‘์—…์„ ์˜ˆ์•ฝ
  • timeout: ์„ผ์„œ ์ž‘์—…์„ ์ค‘์ง€ํ•  ์ตœ๋Œ€ ์‹œ๊ฐ„ (์ดˆ ๋‹จ์œ„).
  • poke_interval: 'poke' ๋ชจ๋“œ์—์„œ ์ฟผ๋ฆฌ๋ฅผ ๋‹ค์‹œ ์‹คํ–‰ํ•  ๊ฐ„๊ฒฉ (์ดˆ ๋‹จ์œ„).

SqlSensor๋Š” ์„ค์ •๋œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์ฟผ๋ฆฌ์˜ ๊ฒฐ๊ณผ๋ฅผ ์ฃผ๊ธฐ์ ์œผ๋กœ ๊ฐ์‹œํ•˜๋ฉฐ, ํŠน์ • ์กฐ๊ฑด์ด ์ถฉ์กฑ๋˜๋ฉด ๋‹ค์Œ ์ž‘์—…์ด ์‹คํ–‰ํ•œ๋‹ค.

 

 

๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ ์ •๋ณด ์„ค์ •

Airflow์—์„œ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์—ฐ๊ฒฐ ์ •๋ณด๋Š” Airflow์˜ Connection์—์„œ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋Š”๋ฐ, ์ด๋Š” Airflow์˜ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ ๊ด€๋ จ์ด ์žˆ์œผ๋ฉฐ DAG (Directed Acyclic Graph) ์‹คํ–‰ ๋™์•ˆ ์‚ฌ์šฉ๋œ๋‹ค.

Airflow ์›น ์ธํ„ฐํŽ˜์ด์Šค ๋˜๋Š” Airflow ๋ช…๋ นํ–‰ ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Connection์„ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๊ณ , ๋‹ค์Œ์€ ์›น ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ์ด์šฉํ•œ connection ์„ค์ • ๋ฐฉ๋ฒ•์ด๋‹ค.

  1. Airflow ์›น ์ธํ„ฐํŽ˜์ด์Šค๋กœ ์ด๋™
  2. Admin ๋ฉ”๋‰ด์—์„œ Connections์„ ์„ ํƒ
  3. "Create" ๋ฒ„ํŠผ์„ ํด๋ฆญํ•˜์—ฌ ์ƒˆ๋กœ์šด Connection์„ ์ถ”๊ฐ€
  4. ์—ฐ๊ฒฐ์— ํ•„์š”ํ•œ ์ •๋ณด (ํ˜ธ์ŠคํŠธ, ํฌํŠธ, ์‚ฌ์šฉ์ž ์ด๋ฆ„, ์•”ํ˜ธ ๋“ฑ)๋ฅผ ์ž…๋ ฅํ•˜๊ณ  ์ €์žฅ
๋ฐ˜์‘ํ˜•