1. ๊ฐ์
PySpark๋ Apache Spark์ Python API์ด๋ฉฐ, ๋๊ท๋ชจ ๋ฐ์ดํฐ๋ฅผ ๋ถ์ฐ ํ๊ฒฝ์์ ์ฒ๋ฆฌํ๊ธฐ ์ํ ํ์ค ๋๊ตฌ์ด๋ค. ML ๋ฆฌ์์น ์์ง๋์ด์๊ฒ PySpark๋ ํ๋ค์ค์ ์ ์ฌํ ๋ฌธ๋ฒ์ ์ ์งํ๋ฉด์๋ ํด๋ฌ์คํฐ ์ ์ฒด ์์์ ํ์ฉํด ํ์ฅ๋ ์ฒ๋ฆฌ๋ฅผ ๊ฐ๋ฅํ๊ฒ ํ๋ ์ ์์ ์ ์ฉํ๋ค. ์ด ๊ธ์์๋ PySpark๋ฅผ ์ด๋์, ์ ์ฌ์ฉํ๋์ง, ๊ทธ๋ฆฌ๊ณ ์ค๋ฌด์๊ฐ ์ด๋ค ๋ถ๋ถ์ ์ค์ ์ ์ผ๋ก ์ดํดํด์ผ ํ๋์ง ์ค๋ช ํ๋ค.
2. PySpark๋ ์ด๋์ ์ฐ๋๊ฐ
PySpark๋ ๋ฐ์ดํฐ ์์ง๋์ด๋ฟ๋ง ์๋๋ผ ML ์์ง๋์ด์๊ฒ๋ ๋ฐ์ดํฐ ์ค๋น์ ์ ์ฒ๋ฆฌ์ ํต์ฌ ๋๊ตฌ๋ก ํ์ฉ๋๋ค. ํนํ ๋์ฉ๋ ๋ฐ์ดํฐ์ ์ ๋ค๋ฃฐ ๋ Pandas๋ก๋ ์ฒ๋ฆฌํ๊ธฐ ์ด๋ ค์ด ๊ฒฝ์ฐ๊ฐ ๋ง์ PySpark๊ฐ ํ์ํ๋ค. ๋ฐ์ดํฐ ๋ถ์๊ณผ ML ์คํ์ ์ํด์๋ ๋น ๋ฅด๊ฒ ๋ฐ์ดํฐ๋ฅผ ๊ฐ๊ณตํ๊ณ ํจ์จ์ ์ผ๋ก ์ ์ฅํ๋ ๊ฒ์ด ์ค์ํ๋ฐ, PySpark๋ ์ด ๊ณผ์ ์ ๋ถ์ฐ ํ๊ฒฝ์์ ์์ ์ ์ผ๋ก ์ํํ ์ ์๋ค.
์ค๋ฌด์์ PySpark๋ฅผ ์์ฃผ ํ์ฉํ๋ ์์ญ์ ๋ค์๊ณผ ๊ฐ๋ค.
- ๋๊ท๋ชจ ๋ก๊ทธ ๋ฐ ๋ฉํ๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ
- ํ์ต/๊ฒ์ฆ ๋ฐ์ดํฐ์ ์์ฑ๊ณผ ์คํ๋ฆฟ
- ๋ค์ค ์กฐ์ธ ๋ฐ ํต๊ณ ์ง๊ณ
- ์ด๋ฏธ์ง/๋น๋์ค ๋ฉํ๋ฐ์ดํฐ ์ฒ๋ฆฌ
- ์ธํผ๋ฐ์ค ๊ฒฐ๊ณผ ๋๋ ์ง๊ณ ๋ฐ ์งํ ์ฐ์ถ
3. PySpark๋ฅผ ์ ์ฐ๋๊ฐ
๋ฐ์ดํฐ์ ML ๊ด์ ์์ PySpark์ ๊ฐ์น๋ ํฌ๊ฒ ๋ค ๊ฐ์ง๋ก ์์ฝํ ์ ์๋ค. ์ฒซ์งธ, ํ์ฅ์ฑ์ด๋ค. ๋จ์ผ ๋จธ์ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์ด๊ณผํ๋ ๋ฐ์ดํฐ๋ ํด๋ฌ์คํฐ์ ๋ถ์ฐํ์ฌ ์ฒ๋ฆฌํ ์ ์๋ค. ๋์งธ, ์ต์ ํ๋ ์คํ ์์ง์ด๋ค. Catalyst Optimizer์ Tungsten ์คํ ์์ง์ ํตํด ๋ณต์กํ ์กฐ์ธ๊ณผ ์ง๊ณ๋ฅผ ํจ์จ์ ์ผ๋ก ์ํํ ์ ์๋ค. ์ ์งธ, ํ์คํ๋ ์ํ๊ณ ์ฐ๊ณ์ฑ์ด๋ค. Parquet, ORC ๊ฐ์ ํฌ๋งท๊ณผ Hive Metastore, Presto/Trino ๊ฐ์ ๋ถ์ ํด๊ณผ ๋งค๋๋ฝ๊ฒ ์ฐ๋๋๋ค. ๋ท์งธ, ๊ฐ๋ฐ ํธ์์ฑ์ด๋ค. ํ๋ค์ค์ ์ ์ฌํ API์ SQL ํผ์ฉ์ด ๊ฐ๋ฅํด Python ์ค์ฌ ์ํฌํ๋ก์ฐ์ ์์ฐ์ค๋ฝ๊ฒ ๋ น์๋ ๋ค.
ํต์ฌ ์ฅ์ ์ ์ ๋ฆฌํ๋ฉด ๋ค์๊ณผ ๊ฐ๋ค.
- ํ์ฅ ๊ฐ๋ฅํ ๋ถ์ฐ ์ฒ๋ฆฌ
- Catalyst Optimizer ๊ธฐ๋ฐ ์ฑ๋ฅ ์ต์ ํ
- ํ์ค ํฌ๋งท ๋ฐ ๋ฐ์ดํฐ ์ํ๊ณ ์ฐ๋
- Python ์นํ์ API์ ๊ฐ๋ฐ ํธ์์ฑ
4. ์ค๋ฌด์๊ฐ ์์์ผ ํ PySpark ๋์ ์๋ฆฌ
ML ์์ง๋์ด์๊ฒ ์ค์ํ ๊ฒ์ Spark ๋ด๋ถ ์ํคํ ์ฒ ์ ์ฒด๊ฐ ์๋๋ผ, ๋ฐ์ดํฐ๋ฅผ ์ด๋ป๊ฒ ๋ค๋ฃจ๊ณ ์ต์ ํํด์ผ ํ๋์ง์ ๋ํ ๊ธฐ๋ณธ ์๋ฆฌ์ด๋ค.
- Lazy evaluation: Transformation์ ์ฆ์ ์คํ๋์ง ์๊ณ Action ํธ์ถ ์ Job์ด ์คํ๋๋ค.
- ์ค๊ฐ ๊ฒฐ๊ณผ๋ฅผ ์ ๊ฒํ ๋๋ show(), count() ๊ฐ์ ๊ฐ๋ฒผ์ด Action๋ง ์ฌ์ฉ
- ์ฌ๋ฌ ๋ฒ ์ฌ์ฌ์ฉํ DataFrame์ cache()๋ persist() ํ ํ ๋ฒ๋ง Action์ผ๋ก ๋ฌผ๋ฆฌํ
- Job → Stage → Task ๊ตฌ์กฐ: ์กฐ์ธ์ด๋ ์ง๊ณ ๊ฐ์ ์ฐ์ฐ์ ์
ํ์ ๋ฐ์์ํค๋ฉฐ Stage๋ก ๋ถ๋ฆฌ๋๋ค.
- ํํฐ/์ปฌ๋ผ ์ ํ์ ์กฐ์ธ ์ ์ ์ ์ฉํด ์ ํ ์ ์ ๋ฐ์ดํฐ๋ ์ต์ํ
- ์์ ์ฐธ์กฐ ํ ์ด๋ธ์ broadcast() ์กฐ์ธ์ผ๋ก ์ ํ ์ค์ด๊ธฐ
- Narrow vs Wide Transformation: map, filter๋ ์
ํ์ด ์์ง๋ง, groupBy, join์ ์
ํ์ ์ ๋ฐํ๋ค.
- wide ์ฐ์ฐ(์ง๊ณ/์กฐ์ธ)์ ์ต๋ํ ์๋จ์์ ๋ฐ์ดํฐ ์ค์ธ ๋ค ์คํ
- Partition๊ณผ ํ์ผ ํฌ๊ธฐ ๊ด๋ฆฌ: Parquet ์ถ๋ ฅ์ 128~512MB ํ์ผ ํฌ๊ธฐ๋ฅผ ๊ถ์ฅํ๋ค.
- ๊ฒฐ๊ณผ ์ ์ฅ ์ repartition()์ผ๋ก ํํฐ์ ์ ์กฐ์
- ์์ ํ์ผ์ด ๋ง์์ง๋ฉด coalesce()๋ก ๋ณํฉ
- ์ถ๋ ฅ ํฌ๊ธฐ์ ๋ง์ถฐ ์ ์ ํ ํํฐ์ ์ ์ฐ์ (์: 200GB ÷ 256MB ≈ 800 ํํฐ์ )
์ด ์๋ฆฌ๋ค์ ์ดํดํ๋ฉด ๋ถํ์ํ ์ ํ๊ณผ ์์ ํ์ผ ๋ฌธ์ ๋ฅผ ์ค์ด๊ณ , ์ฑ๋ฅ ์ ํ๋ฅผ ์๋ฐฉํ ์ ์๋ค.
5. ์ค๋ฌด ํ์ฉ ์์
PySpark๋ ๋ฐ์ดํฐ ์ค๋น ํ์ดํ๋ผ์ธ์์ ์ค์ฉ์ ์ผ๋ก ์ฐ์ธ๋ค. ์๋ฅผ ๋ค์ด, RDS ๊ฐ์ DB์์ ์์ฒ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์ ํ์ต์ฉ ๋ฐ์ดํฐ์ ์ ์์ฑํ๋ ๊ณผ์ ์ ์๊ฐํด๋ณด์.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, broadcast, expr
spark = (SparkSession.builder.appName("etl-orders").getOrCreate())
# 1. ๋ฐ์ดํฐ ์ฝ๊ธฐ (JDBC)
orders = (spark.read.format("jdbc")
.option("url", JDBC_URL)
.option("dbtable","orders")
.option("user", DB_USER)
.option("password", DB_PASS)
.option("partitionColumn","id")
.option("lowerBound",1).option("upperBound",10_000_000).option("numPartitions",128)
.load()
.select("id","user_id","category_id","price","created_at")
.filter(col("created_at") >= expr("date_sub(current_date(), 30)")))
# 2. ์ฐธ์กฐ ๋ฐ์ดํฐ ์ฝ๊ธฐ (Parquet)
cats = spark.read.parquet("s3://bucket/dim/categories/").select("id","display_name")
# 3. ์กฐ์ธ๊ณผ ์ ์ฒ๋ฆฌ
joined = (orders.alias("o")
.join(broadcast(cats.alias("c")), col("o.category_id") == col("c.id"), "left")
.select(col("o.*"), col("c.display_name").alias("category")))
# 4. ์ง๊ณ ๋ฐ ์ ์ฅ
mart = (joined.groupBy("category")
.agg(expr("count(*) as n"), expr("avg(price) as avg_price"), expr("sum(price) as revenue"))
.orderBy(col("revenue").desc()))
(mart.repartition(64)
.write.mode("overwrite")
.parquet("s3://bucket/mart/orders_30d/"))
spark.stop()
์ด ์์๋ DB์์ ๋ฐ์ดํฐ ์ฝ๊ธฐ → ์กฐ์ธ ๋ฐ ์ ์ฒ๋ฆฌ → ์ง๊ณ → Parquet ์ ์ฅ๊น์ง ์ด์ด์ง๋ ์ ํ์ ์ธ ML ๋ฐ์ดํฐ ์ค๋น ์ํฌํ๋ก์ฐ์ด๋ค. ๋ ธํธ๋ถ์์ ๋จผ์ ์ํ ๋ฐ์ดํฐ๋ฅผ ํ์ธํ๊ณ , ๊ฒ์ฆ๋ ์ฝ๋๋ฅผ spark-submit์ผ๋ก ์ ์ถํด ์ ์ฒด ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ๋ฐฉ์์ผ๋ก ํ์ฉํ ์ ์๋ค.
6. ๊ฒฐ๋ก
PySpark๋ Data / ML ์์ง๋์ด๊ฐ ๋๊ท๋ชจ ๋ฐ์ดํฐ๋ฅผ ์์ ์ ์ผ๋ก ์ ์ฒ๋ฆฌํ๊ณ ํ์ต ๋ฐ์ดํฐ์ ์ ๊ตฌ์ฑํ ๋ ํ์์ ์ธ ๋๊ตฌ์ด๋ค. ํ๋ค์ค์ ์ง๊ด์ฑ๊ณผ Spark์ ํ์ฅ์ฑ์ ๊ฒฐํฉํด, ๋ฐ์ดํฐ ์ค๋น ๋จ๊ณ์์ ๋ณ๋ชฉ์ ์ค์ด๊ณ ์ฐ๊ตฌ์ ์คํ์ ์ง์คํ ์ ์๋๋ก ๋๋๋ค. ์ค์ํ ๊ฒ์ DataFrame API์ ๊ธฐ๋ณธ ์ต์ ํ ์๋ฆฌ๋ฅผ ์์งํ๊ณ , ์ด๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํ์ต ํ์ดํ๋ผ์ธ์ ์์ ์ ์ผ๋ก ์คํํ ์ ์๋ ์์ค๊น์ง ์ตํ๋ ๊ฒ์ด๋ค.
'๐ป Programming > Distributed Computing' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
| [Ray] ๋ถ์ฐ ์คํ ํ๋ ์์ํฌ Ray ์ค๋ช (0) | 2025.09.17 |
|---|---|
| [PySpark] Spark Job ์คํ ๊ฐ์ด๋: Ad-hoc vs Batch (0) | 2025.09.17 |
| [PySpark] ์ฃผ์ ์ฐ์ฐ ๊ฐ์ด๋: Transformation, Action (0) | 2025.09.16 |
| [PySpark] ์ฑ๋ฅ ์ต์ ํ ๊ธฐ๋ณธ๊ธฐ: ๋๊ท๋ชจ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ํด (0) | 2025.09.16 |
| [PySpark] ์์ฃผ ์ฐ๋ ๊ธฐ๋ฅ ๋ฉ์๋ ์ ๋ฆฌ (0) | 2025.05.12 |