Apache Airflow์ SqlSensor๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฟผ๋ฆฌ์ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ์ํ๊ณ , ํน์ ์กฐ๊ฑด์ด ์ถฉ์กฑ๋ ๋๊น์ง ์์ ์ ์ผ์ ์ค์งํ๋ ์ญํ ์ ํฉ๋๋ค. ์ด ์ผ์๋ ์ฃผ๋ก ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฟผ๋ฆฌ์ ๊ฒฐ๊ณผ๋ฅผ ํ์ธํ์ฌ ํน์ ๊ฐ์ด๋ ์กฐ๊ฑด์ด ์ถฉ์กฑ๋์๋์ง๋ฅผ ํ์ธํ๋ ๋ฐ ์ฌ์ฉ๋ฉ๋๋ค.
SqlSensor ์ฌ์ฉ ๋ฐฉ๋ฒ
from airflow.sensors.sql import SqlSensor
sql_sensor_task = SqlSensor(
task_id='sql_sensor_task',
conn_id='your_database_connection_id', # ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ID
sql='SELECT COUNT(*) FROM your_table WHERE your_condition;', # ๊ฐ์ํ ์ฟผ๋ฆฌ
mode='poke', # 'poke' ๋๋ 'reschedule' ์ค ์ ํ
timeout=600, # ํ์์์ (์ด ๋จ์)
poke_interval=60, # 'poke' ๋ชจ๋์์ ์ฟผ๋ฆฌ ๊ฐ์ ๊ฐ๊ฒฉ (์ด ๋จ์)
dag=dag # Airflow DAG ๊ฐ์ฒด
)
# DAG์ ์ถ๊ฐ
sql_sensor_task >> your_next_task
- conn_id: ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ์ ์๋ณํ๋ ๋ฌธ์์ด. ์ด ์ฐ๊ฒฐ์ Airflow์ Connection์์ ์ค์ ํด์ผ ํ๋ค
- sql: ๊ฐ์ํ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฟผ๋ฆฌ
- mode: ์ผ์ ๋์ ๋ชจ๋. 'poke'๋ ์ง์์ ์ผ๋ก ๊ฐ์ํ๊ณ , 'reschedule'์ ์ผ์ ๊ฐ๊ฒฉ์ผ๋ก ์๋ก์ด ์์ ์ ์์ฝ
- timeout: ์ผ์ ์์ ์ ์ค์งํ ์ต๋ ์๊ฐ (์ด ๋จ์).
- poke_interval: 'poke' ๋ชจ๋์์ ์ฟผ๋ฆฌ๋ฅผ ๋ค์ ์คํํ ๊ฐ๊ฒฉ (์ด ๋จ์).
SqlSensor๋ ์ค์ ๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฟผ๋ฆฌ์ ๊ฒฐ๊ณผ๋ฅผ ์ฃผ๊ธฐ์ ์ผ๋ก ๊ฐ์ํ๋ฉฐ, ํน์ ์กฐ๊ฑด์ด ์ถฉ์กฑ๋๋ฉด ๋ค์ ์์ ์ด ์คํํ๋ค.
๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ์ ๋ณด ์ค์
Airflow์์ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ์ ๋ณด๋ Airflow์ Connection์์ ์ค์ ํ ์ ์๋๋ฐ, ์ด๋ Airflow์ ๋ฉํ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๊ด๋ จ์ด ์์ผ๋ฉฐ DAG (Directed Acyclic Graph) ์คํ ๋์ ์ฌ์ฉ๋๋ค.
Airflow ์น ์ธํฐํ์ด์ค ๋๋ Airflow ๋ช
๋ นํ ์ธํฐํ์ด์ค๋ฅผ ์ฌ์ฉํ์ฌ Connection์ ์ค์ ํ ์ ์๊ณ , ๋ค์์ ์น ์ธํฐํ์ด์ค๋ฅผ ์ด์ฉํ connection ์ค์ ๋ฐฉ๋ฒ์ด๋ค.
- Airflow ์น ์ธํฐํ์ด์ค๋ก ์ด๋
- Admin ๋ฉ๋ด์์ Connections์ ์ ํ
- "Create" ๋ฒํผ์ ํด๋ฆญํ์ฌ ์๋ก์ด Connection์ ์ถ๊ฐ
- ์ฐ๊ฒฐ์ ํ์ํ ์ ๋ณด (ํธ์คํธ, ํฌํธ, ์ฌ์ฉ์ ์ด๋ฆ, ์ํธ ๋ฑ)๋ฅผ ์ ๋ ฅํ๊ณ ์ ์ฅ