728x90 ๐ป Programming/Apache Airflow6 [Airflow] DB ์ฟผ๋ฆฌ์ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ์ํ๊ณ , ํน์ ์กฐ๊ฑด์ด ์ถฉ์กฑ๋ ๋๊น์ง ์์ ์ ์ผ์ ์ค์งํ๋ ๊ธฐ๋ฅ | SqlSensor Apache Airflow์ SqlSensor๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฟผ๋ฆฌ์ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ์ํ๊ณ , ํน์ ์กฐ๊ฑด์ด ์ถฉ์กฑ๋ ๋๊น์ง ์์ ์ ์ผ์ ์ค์งํ๋ ์ญํ ์ ํฉ๋๋ค. ์ด ์ผ์๋ ์ฃผ๋ก ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฟผ๋ฆฌ์ ๊ฒฐ๊ณผ๋ฅผ ํ์ธํ์ฌ ํน์ ๊ฐ์ด๋ ์กฐ๊ฑด์ด ์ถฉ์กฑ๋์๋์ง๋ฅผ ํ์ธํ๋ ๋ฐ ์ฌ์ฉ๋ฉ๋๋ค. SqlSensor ์ฌ์ฉ ๋ฐฉ๋ฒ from airflow.sensors.sql import SqlSensor sql_sensor_task = SqlSensor( task_id='sql_sensor_task', conn_id='your_database_connection_id', # ๋ฐ์ดํฐ๋ฒ ์ด์ค ์ฐ๊ฒฐ ID sql='SELECT COUNT(*) FROM your_table WHERE your_condition;', # ๊ฐ์ํ ์ฟผ๋ฆฌ mode='poke', #.. 2023. 11. 20. [Airflow] ์์ดํ๋ก์ฐ ์ค์น ๋ฐ ์น ์ธํฐํ์ด์ค ์คํํ๊ธฐ 1. Airflow ์ค์น pip install apache-airflow 2. Airflow ์ค์ cd airflow airflow db init mkdir dags ๋ง๋ค์ด์ง airflow ํด๋๋ก ๋ค์ด๊ฐ์ db๋ฅผ init ํด์ฃผ๊ณ dags ํด๋๋ฅผ ์์ฑ airflow users create -u admin -p admin -f Clueless -l Coder -r Admin -e admin@admin.com ๊ด๋ฆฌ์ ๊ณ์ ์์ฑ 3. Airflow ์คํ airflow webserver -p 8080 8080 ํฌํธ๋ก ์์ดํ๋ก์ฐ ์คํ 'localhost:8080' ๋ก ์ ์ 4. Airflow ์น ์ธํฐํ์ด์ค Apache Airflow ์น์๋ฒ๋ Airflow ์ํฌํ๋ก์ฐ์ ์๊ฐํ, ๋ชจ๋ํฐ๋ง ๋ฐ ๊ด๋ฆฌ๋ฅผ ์ํ ์ฌ์ฉ์ ์ธํฐํ.. 2023. 11. 20. [Airflow] Python ํจ์ ์คํํ๊ธฐ | PythonOperator ์ฌ์ฉ PythonOperator๋ Apache Airflow์์ Python ํจ์๋ฅผ ์คํํ๋ ์์ ์ ์ ์ํ๋ ๋ฐ ์ฌ์ฉ๋๋ ์ฐ์ฐ์์ด๋ค. ์ด๋ฅผ ํตํด Python ํจ์๋ฅผ ํธ์ถํ์ฌ ๋ฐ์ดํฐ ์ฒ๋ฆฌ, ๊ณ์ฐ, ๋๋ ์ฌ์ฉ์ ์ง์ ์์ ์ ์ํํ ์ ์๋ค. ์๋๋ PythonOperator๋ฅผ ์ฌ์ฉํ๋ ๊ฐ๋จํ ์์ ์ด๋ค. from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta # DAG ์ ์ dag = DAG( 'python_operator_example', description='Example DAG with PythonOperator', schedule_inte.. 2023. 11. 19. [Airflow] ์ ์คํฌ๋ฆฝํธ, ๋ช ๋ น์ด ์คํํ๊ธฐ | BashOperator ์ฌ์ฉ BashOperator๋ Apache Airflow์์ ์ ์คํฌ๋ฆฝํธ๋ ๋ช ๋ น์ด๋ฅผ ์คํํ๋ ์์ ์ ์ ์ํ๋ ๋ฐ ์ฌ์ฉ๋๋ ์ฐ์ฐ์์ด๋ค. ์ด๋ฅผ ํตํด ์ธ๋ถ ํ๋ก๊ทธ๋จ, ์คํฌ๋ฆฝํธ ๋๋ ๋ช ๋ น์ด๋ฅผ ์คํํ๊ณ ๊ฒฐ๊ณผ๋ฅผ ํ์ธํ ์ ์๋ค. ๋ค์์ BashOperator๋ฅผ ์ฌ์ฉํ๋ ๊ฐ๋จํ ์์ ๋ก, ๊ฐ๋จํ Bash ์คํฌ๋ฆฝํธ๋ฅผ ์คํํ๊ณ ์ถ๋ ฅ์ ๋ก๊น ํ๋ค. from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta # DAG ์ ์ dag = DAG( 'bash_operator_example', description='Example DAG with BashOperator', sched.. 2023. 11. 19. [Airflow] ์ผ์ ํ ๊ฐ๊ฒฉ์ผ๋ก DAG ์คํํ๊ธฐ (์ค์ผ์ค๋ง) | schedule_interval | cron ๊ธฐ๋ฐ ์ค์ผ์ค Apache Airflow์์ DAG์ ์ผ์ ํ ๊ฐ๊ฒฉ์ผ๋ก ์คํํ๋ ค๋ฉด schedule_interval ๋งค๊ฐ๋ณ์๋ฅผ ์ฌ์ฉํ๋ค. ์ด ๋งค๊ฐ๋ณ์๋ DAG์ด ์คํ๋ ์ฃผ๊ธฐ๋ฅผ ๋ํ๋ด๊ณ , ์ฃผ๊ธฐ๋ timedelta ๊ฐ์ฒด๋ก ์ ์๋๋ค. ์๋ฅผ ๋ค์ด, ๋งค์ผ ์คํํ๋ ค๋ฉด timedelta(days=1)๊ณผ ๊ฐ์ด ์ ์ํ ์ ์๋ค. timedelta๋ฅผ ์ฌ์ฉํ๋ ๋ช ๊ฐ์ง ์์ ๋ฅผ ์ดํด๋ณด์. timedelta # ๋งค์ผ ์คํ schedule_interval=timedelta(days=1) # 3์ผ๋ง๋ค ์คํ schedule_interval=timedelta(days=3) # ๋งค์ฃผ ์์์ผ ์คํ schedule_interval=timedelta(weeks=1, days=1) # ๋งค์๊ฐ ์คํ schedule_interval=timedelta(hours=1.. 2023. 11. 19. [Airflow] Airflow & DAG ์ค๋ช Apache Airflow Apache Airflow๋ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ๊ด๋ฆฌํ๊ณ ์ค์ผ์ค๋งํ๊ธฐ ์ํ ์คํ ์์ค ํ๋ซํผ์ด๋ค. Airflow๋ ํ์ด์ฌ ์ฝ๋๋ฅผ ์ด์ฉํด ํ์ดํ๋ผ์ธ์ ๊ตฌํํ ์ ์๊ธฐ์ ํ์ด์ฌ ์ธ์ด๋ก ๊ตฌํํ ์ ์๋ ๋๋ถ๋ถ์ ๋ฐฉ๋ฒ์ ์ฌ์ฉํ์ฌ ์ฌ๋ฌ ์ปค์คํ ํ์ดํ๋ผ์ธ์ ๋ง๋ค ์ ์๋ค. ๋ํ ์ฝ๊ฒ ํ์ฅ ๊ฐ๋ฅํ๊ณ ๋ค์ํ ์์คํ ๊ณผ ํตํฉ์ด ๊ฐ๋ฅํ๋ค. ์๋ง์ ์ค์ผ์ค๋ง ๊ธฐ๋ฒ์ผ๋ก ํ์ดํ๋ผ์ธ์ ์ ๊ธฐ์ ์ผ๋ก ์คํํ๊ณ ์ ์ง์ ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํ๊ณ ์คํ ์์ค๋ผ๋ ์ฅ์ ์ด ์๊ธฐ ๋๋ฌธ์ ๋ง์ ๊ธฐ์ ์์ Airflow๋ฅผ ์ฌ์ฉํ๊ณ ์๋ค. (๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์์ ์ ์กฐ์งํ๊ณ ์คํํ๊ธฐ ์ํ ์ผ๋ จ์ ๋จ๊ณ ๋ฐ ํ๋ก์ธ์ค) Apache Airflow์ ํน์ง ์ค์ผ์ค๋ง๊ณผ ๋ชจ๋ํฐ๋ง Airflow๋ ์์ ์ ์ค์ผ์ค๋งํ๊ณ ๊ฐ์ํ๋ ๋ฐ ์ฌ์ฉ๋จ ์.. 2023. 11. 19. ์ด์ 1 ๋ค์ 728x90