1. ๊ฐ์
PySpark์ ์ฐ์ฐ์ ํฌ๊ฒ Transformation๊ณผ Action์ผ๋ก ๊ตฌ๋ถ๋๋ค. ์ด ๊ตฌ๋ถ์ ์ดํดํ๋ฉด “์ด๋์ ์ฑ๋ฅ ๋น์ฉ์ด ๋ฐ์ํ๋์ง, ์ด๋ค ์ฝ๋๊ฐ ์ค์ ์คํ์ ํธ๋ฆฌ๊ฑฐํ๋์ง”๋ฅผ ๋ช ํํ ์ ์ ์์ด ์ ์ฒ๋ฆฌ์ ํ์ต ๋ฐ์ดํฐ ์ค๋น ๋จ๊ณ์์ ์ํ์ฐฉ์ค๋ฅผ ์ค์ผ ์ ์๋ค.
2. Transformation vs Action
2.1 ์ ์
- Transformation: ์๋ก์ด DataFrame์ ๋ฐํํ์ง๋ง ์ฆ์ ์คํ๋์ง ์๋๋ค(=lazy). Spark๋ lineage(์ฐ์ฐ ์ด๋ ฅ)๋ง ์๋๋ค.
- Action: Spark๊ฐ ์ค์ Job์ ์คํํ์ฌ ๊ฒฐ๊ณผ๋ฅผ ๋ฐํํ๊ฑฐ๋ ์ธ๋ถ ์ ์ฅ์์ ๊ธฐ๋กํ๋ค.
2.2 ์คํ ๋ชจ๋ธ ์์ฝ
- Action์ ๋ง๋๊ธฐ ์ ๊น์ง๋ Job์ด ์์ฑ๋์ง ์๋๋ค.
- Action์ ํธ์ถํ๋ ์๊ฐ DAG๊ฐ Job → Stage(์ ํ ๊ฒฝ๊ณ ๊ธฐ์ค) → Task(ํํฐ์ ๋จ์)๋ก ๋ถํด๋์ด ์คํ๋๋ค.
๋์ผ DataFrame์ Action์ ์ฌ๋ฌ ๋ฒ ํธ์ถํ๋ฉด ๋์ผ lineage๊ฐ ๋ฐ๋ณต ์คํ๋๋ค. ์ฌ์ฌ์ฉํ ์ค๊ฐ ๊ฒฐ๊ณผ๋ cache()/persist() ํ ํ ๋ฒ๋ง Action์ผ๋ก ๋ฌผ๋ฆฌํํ๋ ์ต๊ด์ด ์ฑ๋ฅ ์ต์ ํ์ ์ฒซ๊ฑธ์์ด๋ค.
3. Transformation ์ฐ์ฐ
์๋ ์ฐ์ฐ์ ๋ชจ๋ lazy์ด๋ฉฐ, ์ ํ์ ์ ๋ฐํ๋์ง ์ฌ๋ถ(์ข์/๋์ ๋ณํ)๋ฅผ ํจ๊ป ํ์ํ๋ค.
3.1 select / withColumn / drop (Narrow)
- select: ์ํ๋ ์ปฌ๋ผ๋ง ์ถ์ถ → ๋ถํ์ํ ์ปฌ๋ผ ์ ๊ฑฐ.
- withColumn: ์๋ก์ด ์ปฌ๋ผ ์์ฑ ๋๋ ๊ธฐ์กด ์ปฌ๋ผ ์์ .
- drop: ํน์ ์ปฌ๋ผ ์ ๊ฑฐ.
from pyspark.sql.functions import col, when
base = df.select("id","user_id","price","ts")
feat = base.withColumn("bucket", when(col("price")<50,"low").otherwise("high"))
3.2 filter (Narrow)
- ์กฐ๊ฑด์ ๋ง๋ ํ๋ง ๋จ๊น.
- ์กฐ์ธ·์ง๊ณ ์ ์ ๋จผ์ ์ ์ฉํด ๋ฐ์ดํฐ๋ ์ถ์.
recent = df.filter(col("ts") >= cutoff)
3.3 join (Wide, Shuffle)
- ๋ DataFrame์ ํน์ ํค๋ก ๊ฒฐํฉ.
- ๋๊ท๋ชจ ์ ํ ๋ฐ์ ๊ฐ๋ฅ, ์์ ํ ์ด๋ธ์ broadcast()๋ก ์ต์ ํ.
from pyspark.sql.functions import broadcast
joined = fact.join(broadcast(dim.select("id","name")), fact.dim_id==dim.id, "left")
3.4 groupBy + agg (Wide, Shuffle)
- groupBy(): ํน์ ์ปฌ๋ผ ๊ธฐ์ค์ผ๋ก ๊ทธ๋ฃนํํ์ฌ ํด๋น ์ปฌ๋ผ์ด ๊ทธ๋ฃน ํค๊ฐ ๋๊ณ , ํด๋น ์ปฌ๋ผ์ ๋์ผํ ๊ฐ์ ๊ฐ์ง ํ๋ค์ด ํ๋์ ๊ทธ๋ฃน์ผ๋ก ๋ฌถ์
- agg(): ์ง๊ณ ํจ์(avg, sum, count, max, min ๋ฑ)์ ํจ๊ป ์ฌ์ฉ.
| user_id | category | price |
| 1 | A | 100 |
| 2 | A | 200 |
| 3 | B | 300 |
| 4 | B | 400 |
| 5 | A | 50 |
from pyspark.sql.functions import avg, sum
agg = df.groupBy("category").agg(
avg("price").alias("avg_price"),
sum("price").alias("total_price")
)
agg.show()
| category | avg_price | total_price |
| A | 116.67 | 350 |
| B | 350.00 | 700 |
- ์ ์์๋ฅผ ๋ณด๋ฉด, category ์ปฌ๋ผ์ groupBy๋ก ๋ฌถ๊ณ
- avg, sum ํจ์๋ฅผ ์ฌ์ฉํด์ category ๋์ผ ๊ฐ๋ค์ ํ๊ท ๊ณผ ํฉ์ ๋ํ ์ปฌ๋ผ์ ์์ฑ
3.5 distinct / dropDuplicates (Wide, Shuffle)
- distinct: ์ ์ฒด ํ์์ ์ค๋ณต ์ ๊ฑฐ.
- dropDuplicates: ํน์ ์ปฌ๋ผ ๊ธฐ์ค์ผ๋ก ์ค๋ณต ์ ๊ฑฐ.
uniq = df.dropDuplicates(["user_id","date"])
3.6 repartition / coalesce (Shuffle/No-shuffle)
- repartition(N): ์ ํ์ ๋ฐ์์์ผ ํํฐ์ ์ ๊ท ๋ฑํ๊ฒ ์ฌ๋ถ๋ฐฐ.
- coalesce(N): ์ ํ ์์ด ํํฐ์ ์๋ง ์ถ์(๋ณ๋ ฌ์ฑ ์ ํ ์ฃผ์).
wide = df.repartition(800)
compact = wide.coalesce(100)
3.7 Window functions (Wide, Shuffle ๊ฐ๋ฅ)
- ํํฐ์ ๋ณ/์ ๋ ฌ ์กฐ๊ฑด๋ณ๋ก ์ ํ ๋งฅ๋ฝ ์ง๊ณ.
- ์์, ๋์ ํฉ, ์ต๊ทผ ๋ฐ์ดํฐ ์ถ์ถ ๋ฑ์ ํ์ฉ.
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
w = Window.partitionBy("user_id").orderBy(col("ts").desc())
latest = df.withColumn("rn", row_number().over(w)).filter("rn=1")
4. Action ์ฐ์ฐ
Action์ “๊ฒฐ๊ณผ๋ฅผ ๋ชจ์ผ๊ฑฐ๋ ์ ์ฅํ๊ฑฐ๋ ์ถ๋ ฅ”ํ๋ ๋ชจ๋ ์ฐ์ฐ์ด๋ค.
4.1 ๊ฒฐ๊ณผ ๋ฐํํ Action
- count(): ์ ์ฒด ํ ๊ฐ์ ๋ฐํ.
- first() / head(): ์ฒซ ํ ๋๋ n๊ฐ์ ํ ๋ฐํ.
- take(n): ์ง์ ํ ๊ฐ์๋ง ์์ง.
- collect(): ์ ์ฒด ๋ฐ์ดํฐ๋ฅผ ๋๋ผ์ด๋ฒ๋ก ์์ง(๋ฉ๋ชจ๋ฆฌ ์ฃผ์).
cnt = df.count()
head5 = df.take(5)
one = df.first()
rows = df.collect()
4.2 ํ์/๋๋ฒ๊น Action
- show(): ์ง์ ํ ๊ฐ์๋ง ์ฝ์์ ์ถ๋ ฅ(๋ด๋ถ์ ์ผ๋ก ์์ง ํ ํฌ๋งทํ ).
df.show(20, truncate=False)
4.3 ์ ์ฅ Action (Writer API)
- write.save(): ํ์ผ ์์คํ ์ ์ ์ฅ.
- saveAsTable(): Hive/๋ฉํ์คํ ์ด์ ํ ์ด๋ธ๋ก ์ ์ฅ.
- jdbc.save(): DB์ ์ง์ ์ ์ฌ.
(df.write.mode("overwrite")
.option("compression","zstd")
.parquet("s3://bucket/mart/orders/"))
4.4 toPandas (๋๋ผ์ด๋ฒ ์์ง)
- DataFrame์ Pandas๋ก ๋ณํ.
- ์๊ท๋ชจ ๋ฐ์ดํฐ ์ํ ํ์ธ์ฉ์ผ๋ก๋ง ์ฌ์ฉ.
small = df.select("a","b").sample(0.01, seed=42).limit(10_000)
pdf = small.toPandas()
Action ํธ์ถ ํ์๋ฅผ ์ต์ํํ๊ณ , ๋์ผ DF๋ฅผ ์ฌ์ฌ์ฉํ๋ค๋ฉด cache()/persist() ํ ํ ๋ฒ๋ง ๋ฌผ๋ฆฌํํ๋ค.
5. Transformation vs Action ์นํธ์ํธ
| ๋ถ๋ฅ | ๋ํ ์ฐ์ฐ ๊ธฐ๋ฅ | ์์ฝ | ์ ํ ๊ฐ๋ฅ์ฑ |
| Transformation |
select | ์ปฌ๋ผ ์ ํ | Narrow |
| withColumn | ์ปฌ๋ผ ์์ฑ/์์ | Narrow | |
| drop | ์ปฌ๋ผ ์ ๊ฑฐ | Narrow | |
| filter | ์กฐ๊ฑด ํ ํํฐ๋ง | Narrow | |
| join | ํค ๊ธฐ๋ฐ ๊ฒฐํฉ | Wide | |
| groupBy+agg | ๊ทธ๋ฃน ์ง๊ณ | Wide | |
| distinct, dropDuplicates | ์ค๋ณต ์ ๊ฑฐ | Wide | |
| repartition | ํํฐ์ ์ฌ๋ถ๋ฐฐ | Shuffle | |
| coalesce | ํํฐ์ ์ถ์ | No-shuffle | |
| window | ํํฐ์ /์ ๋ ฌ ๊ธฐ๋ฐ ์ง๊ณ | Wide | |
| Action | count | ์ ์ฒด ํ ์ ๋ฐํ | ์คํ ํธ๋ฆฌ๊ฑฐ |
| first, head, take | ์ผ๋ถ ํ ๋ฐํ | ์คํ ํธ๋ฆฌ๊ฑฐ | |
| collect | ์ ์ฒด ๋ฐ์ดํฐ ์์ง | ์คํ ํธ๋ฆฌ๊ฑฐ | |
| show | ํ ์ถ๋ ฅ(๋๋ฒ๊น ์ฉ) | ์คํ ํธ๋ฆฌ๊ฑฐ | |
| write.save, saveAsTable, jdbc.save | ์ ์ฅ | ์คํ ํธ๋ฆฌ๊ฑฐ | |
| toPandas | Pandas ๋ณํ | ์คํ ํธ๋ฆฌ๊ฑฐ |
'๐ป Programming > Distributed Computing' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
| [Ray] ๋ถ์ฐ ์คํ ํ๋ ์์ํฌ Ray ์ค๋ช (0) | 2025.09.17 |
|---|---|
| [PySpark] Spark Job ์คํ ๊ฐ์ด๋: Ad-hoc vs Batch (0) | 2025.09.17 |
| [PySpark] ์ฑ๋ฅ ์ต์ ํ ๊ธฐ๋ณธ๊ธฐ: ๋๊ท๋ชจ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ํด (0) | 2025.09.16 |
| [PySpark] ์ด๋์, ์ ์ธ๊น? | ๋์ฉ๋ ๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ์ ML ํ์ฉ ๊ฐ์ด๋ (0) | 2025.09.16 |
| [PySpark] ์์ฃผ ์ฐ๋ ๊ธฐ๋ฅ ๋ฉ์๋ ์ ๋ฆฌ (0) | 2025.05.12 |