[PySpark] ์ฃผ์š” ์—ฐ์‚ฐ ๊ฐ€์ด๋“œ: Transformation, Action

2025. 9. 16. 16:54ยท๐Ÿ’ป Programming/Distributed Computing
๋ฐ˜์‘ํ˜•

1. ๊ฐœ์š”

PySpark์˜ ์—ฐ์‚ฐ์€ ํฌ๊ฒŒ Transformation๊ณผ Action์œผ๋กœ ๊ตฌ๋ถ„๋œ๋‹ค. ์ด ๊ตฌ๋ถ„์„ ์ดํ•ดํ•˜๋ฉด “์–ด๋””์„œ ์„ฑ๋Šฅ ๋น„์šฉ์ด ๋ฐœ์ƒํ•˜๋Š”์ง€, ์–ด๋–ค ์ฝ”๋“œ๊ฐ€ ์‹ค์ œ ์‹คํ–‰์„ ํŠธ๋ฆฌ๊ฑฐํ•˜๋Š”์ง€”๋ฅผ ๋ช…ํ™•ํžˆ ์•Œ ์ˆ˜ ์žˆ์–ด ์ „์ฒ˜๋ฆฌ์™€ ํ•™์Šต ๋ฐ์ดํ„ฐ ์ค€๋น„ ๋‹จ๊ณ„์—์„œ ์‹œํ–‰์ฐฉ์˜ค๋ฅผ ์ค„์ผ ์ˆ˜ ์žˆ๋‹ค. 

 

2. Transformation vs Action

2.1 ์ •์˜

  • Transformation: ์ƒˆ๋กœ์šด DataFrame์„ ๋ฐ˜ํ™˜ํ•˜์ง€๋งŒ ์ฆ‰์‹œ ์‹คํ–‰๋˜์ง€ ์•Š๋Š”๋‹ค(=lazy). Spark๋Š” lineage(์—ฐ์‚ฐ ์ด๋ ฅ)๋งŒ ์Œ“๋Š”๋‹ค.
  • Action: Spark๊ฐ€ ์‹ค์ œ Job์„ ์‹คํ–‰ํ•˜์—ฌ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๊ฑฐ๋‚˜ ์™ธ๋ถ€ ์ €์žฅ์†Œ์— ๊ธฐ๋กํ•œ๋‹ค.

2.2 ์‹คํ–‰ ๋ชจ๋ธ ์š”์•ฝ

  • Action์„ ๋งŒ๋‚˜๊ธฐ ์ „๊นŒ์ง€๋Š” Job์ด ์ƒ์„ฑ๋˜์ง€ ์•Š๋Š”๋‹ค.
  • Action์„ ํ˜ธ์ถœํ•˜๋Š” ์ˆœ๊ฐ„ DAG๊ฐ€ Job → Stage(์…”ํ”Œ ๊ฒฝ๊ณ„ ๊ธฐ์ค€) → Task(ํŒŒํ‹ฐ์…˜ ๋‹จ์œ„)๋กœ ๋ถ„ํ•ด๋˜์–ด ์‹คํ–‰๋œ๋‹ค.

๋™์ผ DataFrame์— Action์„ ์—ฌ๋Ÿฌ ๋ฒˆ ํ˜ธ์ถœํ•˜๋ฉด ๋™์ผ lineage๊ฐ€ ๋ฐ˜๋ณต ์‹คํ–‰๋œ๋‹ค. ์žฌ์‚ฌ์šฉํ•  ์ค‘๊ฐ„ ๊ฒฐ๊ณผ๋Š” cache()/persist() ํ›„ ํ•œ ๋ฒˆ๋งŒ Action์œผ๋กœ ๋ฌผ๋ฆฌํ™”ํ•˜๋Š” ์Šต๊ด€์ด ์„ฑ๋Šฅ ์ตœ์ ํ™”์˜ ์ฒซ๊ฑธ์Œ์ด๋‹ค.

 

3. Transformation ์—ฐ์‚ฐ

์•„๋ž˜ ์—ฐ์‚ฐ์€ ๋ชจ๋‘ lazy์ด๋ฉฐ, ์…”ํ”Œ์„ ์œ ๋ฐœํ•˜๋Š”์ง€ ์—ฌ๋ถ€(์ข์€/๋„“์€ ๋ณ€ํ™˜)๋ฅผ ํ•จ๊ป˜ ํ‘œ์‹œํ•œ๋‹ค.

3.1 select / withColumn / drop (Narrow)

  • select: ์›ํ•˜๋Š” ์ปฌ๋Ÿผ๋งŒ ์ถ”์ถœ → ๋ถˆํ•„์š”ํ•œ ์ปฌ๋Ÿผ ์ œ๊ฑฐ.
  • withColumn: ์ƒˆ๋กœ์šด ์ปฌ๋Ÿผ ์ƒ์„ฑ ๋˜๋Š” ๊ธฐ์กด ์ปฌ๋Ÿผ ์ˆ˜์ •.
  • drop: ํŠน์ • ์ปฌ๋Ÿผ ์ œ๊ฑฐ.
from pyspark.sql.functions import col, when
base = df.select("id","user_id","price","ts")
feat = base.withColumn("bucket", when(col("price")<50,"low").otherwise("high"))

3.2 filter (Narrow)

  • ์กฐ๊ฑด์— ๋งž๋Š” ํ–‰๋งŒ ๋‚จ๊น€.
  • ์กฐ์ธ·์ง‘๊ณ„ ์ „์— ๋จผ์ € ์ ์šฉํ•ด ๋ฐ์ดํ„ฐ๋Ÿ‰ ์ถ•์†Œ.
recent = df.filter(col("ts") >= cutoff)

3.3 join (Wide, Shuffle)

  • ๋‘ DataFrame์„ ํŠน์ • ํ‚ค๋กœ ๊ฒฐํ•ฉ.
  • ๋Œ€๊ทœ๋ชจ ์…”ํ”Œ ๋ฐœ์ƒ ๊ฐ€๋Šฅ, ์ž‘์€ ํ…Œ์ด๋ธ”์€ broadcast()๋กœ ์ตœ์ ํ™”.
from pyspark.sql.functions import broadcast
joined = fact.join(broadcast(dim.select("id","name")), fact.dim_id==dim.id, "left")

3.4 groupBy + agg (Wide, Shuffle)

  • groupBy(): ํŠน์ • ์ปฌ๋Ÿผ ๊ธฐ์ค€์œผ๋กœ ๊ทธ๋ฃนํ™”ํ•˜์—ฌ ํ•ด๋‹น ์ปฌ๋Ÿผ์ด ๊ทธ๋ฃน ํ‚ค๊ฐ€ ๋˜๊ณ , ํ•ด๋‹น ์ปฌ๋Ÿผ์˜ ๋™์ผํ•œ ๊ฐ’์„ ๊ฐ€์ง„ ํ–‰๋“ค์ด ํ•˜๋‚˜์˜ ๊ทธ๋ฃน์œผ๋กœ ๋ฌถ์ž„
  • agg(): ์ง‘๊ณ„ ํ•จ์ˆ˜(avg, sum, count, max, min ๋“ฑ)์™€ ํ•จ๊ป˜ ์‚ฌ์šฉ.
user_id category price
1 A 100
2 A 200
3 B 300
4 B 400
5 A 50
from pyspark.sql.functions import avg, sum

agg = df.groupBy("category").agg(
    avg("price").alias("avg_price"),
    sum("price").alias("total_price")
)
agg.show()
category avg_price total_price
A 116.67 350
B 350.00 700

 

  • ์œ„ ์˜ˆ์‹œ๋ฅผ ๋ณด๋ฉด, category ์ปฌ๋Ÿผ์„ groupBy๋กœ ๋ฌถ๊ณ 
  • avg, sum ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•ด์„œ category ๋™์ผ ๊ฐ’๋“ค์˜ ํ‰๊ท ๊ณผ ํ•ฉ์— ๋Œ€ํ•œ ์ปฌ๋Ÿผ์„ ์ƒ์„ฑ

3.5 distinct / dropDuplicates (Wide, Shuffle)

  • distinct: ์ „์ฒด ํ–‰์—์„œ ์ค‘๋ณต ์ œ๊ฑฐ.
  • dropDuplicates: ํŠน์ • ์ปฌ๋Ÿผ ๊ธฐ์ค€์œผ๋กœ ์ค‘๋ณต ์ œ๊ฑฐ.
uniq = df.dropDuplicates(["user_id","date"])

3.6 repartition / coalesce (Shuffle/No-shuffle)

  • repartition(N): ์…”ํ”Œ์„ ๋ฐœ์ƒ์‹œ์ผœ ํŒŒํ‹ฐ์…˜์„ ๊ท ๋“ฑํ•˜๊ฒŒ ์žฌ๋ถ„๋ฐฐ.
  • coalesce(N): ์…”ํ”Œ ์—†์ด ํŒŒํ‹ฐ์…˜ ์ˆ˜๋งŒ ์ถ•์†Œ(๋ณ‘๋ ฌ์„ฑ ์ €ํ•˜ ์ฃผ์˜).
wide = df.repartition(800)
compact = wide.coalesce(100)

3.7 Window functions (Wide, Shuffle ๊ฐ€๋Šฅ)

  • ํŒŒํ‹ฐ์…˜๋ณ„/์ •๋ ฌ ์กฐ๊ฑด๋ณ„๋กœ ์ „ํ›„ ๋งฅ๋ฝ ์ง‘๊ณ„.
  • ์ˆœ์œ„, ๋ˆ„์ ํ•ฉ, ์ตœ๊ทผ ๋ฐ์ดํ„ฐ ์ถ”์ถœ ๋“ฑ์— ํ™œ์šฉ.
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
w = Window.partitionBy("user_id").orderBy(col("ts").desc())
latest = df.withColumn("rn", row_number().over(w)).filter("rn=1")

 

 

4. Action ์—ฐ์‚ฐ

Action์€ “๊ฒฐ๊ณผ๋ฅผ ๋ชจ์œผ๊ฑฐ๋‚˜ ์ €์žฅํ•˜๊ฑฐ๋‚˜ ์ถœ๋ ฅ”ํ•˜๋Š” ๋ชจ๋“  ์—ฐ์‚ฐ์ด๋‹ค.

4.1 ๊ฒฐ๊ณผ ๋ฐ˜ํ™˜ํ˜• Action

  • count(): ์ „์ฒด ํ–‰ ๊ฐœ์ˆ˜ ๋ฐ˜ํ™˜.
  • first() / head(): ์ฒซ ํ–‰ ๋˜๋Š” n๊ฐœ์˜ ํ–‰ ๋ฐ˜ํ™˜.
  • take(n): ์ง€์ •ํ•œ ๊ฐœ์ˆ˜๋งŒ ์ˆ˜์ง‘.
  • collect(): ์ „์ฒด ๋ฐ์ดํ„ฐ๋ฅผ ๋“œ๋ผ์ด๋ฒ„๋กœ ์ˆ˜์ง‘(๋ฉ”๋ชจ๋ฆฌ ์ฃผ์˜).
cnt   = df.count()
head5 = df.take(5)
one   = df.first()
rows  = df.collect()

4.2 ํ‘œ์‹œ/๋””๋ฒ„๊น… Action

  • show(): ์ง€์ •ํ•œ ๊ฐœ์ˆ˜๋งŒ ์ฝ˜์†”์— ์ถœ๋ ฅ(๋‚ด๋ถ€์ ์œผ๋กœ ์ˆ˜์ง‘ ํ›„ ํฌ๋งทํŒ…).
df.show(20, truncate=False)

4.3 ์ €์žฅ Action (Writer API)

  • write.save(): ํŒŒ์ผ ์‹œ์Šคํ…œ์— ์ €์žฅ.
  • saveAsTable(): Hive/๋ฉ”ํƒ€์Šคํ† ์–ด์— ํ…Œ์ด๋ธ”๋กœ ์ €์žฅ.
  • jdbc.save(): DB์— ์ง์ ‘ ์ ์žฌ.
(df.write.mode("overwrite")
    .option("compression","zstd")
    .parquet("s3://bucket/mart/orders/"))

4.4 toPandas (๋“œ๋ผ์ด๋ฒ„ ์ˆ˜์ง‘)

  • DataFrame์„ Pandas๋กœ ๋ณ€ํ™˜.
  • ์†Œ๊ทœ๋ชจ ๋ฐ์ดํ„ฐ ์ƒ˜ํ”Œ ํ™•์ธ์šฉ์œผ๋กœ๋งŒ ์‚ฌ์šฉ.
small = df.select("a","b").sample(0.01, seed=42).limit(10_000)
pdf = small.toPandas()

Action ํ˜ธ์ถœ ํšŸ์ˆ˜๋ฅผ ์ตœ์†Œํ™”ํ•˜๊ณ , ๋™์ผ DF๋ฅผ ์žฌ์‚ฌ์šฉํ•œ๋‹ค๋ฉด cache()/persist() ํ›„ ํ•œ ๋ฒˆ๋งŒ ๋ฌผ๋ฆฌํ™”ํ•œ๋‹ค.

 

 

5. Transformation vs Action ์น˜ํŠธ์‹œํŠธ

๋ถ„๋ฅ˜ ๋Œ€ํ‘œ ์—ฐ์‚ฐ ๊ธฐ๋Šฅ ์š”์•ฝ ์…”ํ”Œ ๊ฐ€๋Šฅ์„ฑ
Transformation

select ์ปฌ๋Ÿผ ์„ ํƒ Narrow
withColumn ์ปฌ๋Ÿผ ์ƒ์„ฑ/์ˆ˜์ • Narrow
drop ์ปฌ๋Ÿผ ์ œ๊ฑฐ Narrow
filter ์กฐ๊ฑด ํ–‰ ํ•„ํ„ฐ๋ง Narrow
join ํ‚ค ๊ธฐ๋ฐ˜ ๊ฒฐํ•ฉ Wide
groupBy+agg ๊ทธ๋ฃน ์ง‘๊ณ„ Wide
distinct, dropDuplicates ์ค‘๋ณต ์ œ๊ฑฐ Wide
repartition ํŒŒํ‹ฐ์…˜ ์žฌ๋ถ„๋ฐฐ Shuffle
coalesce ํŒŒํ‹ฐ์…˜ ์ถ•์†Œ No-shuffle
window ํŒŒํ‹ฐ์…˜/์ •๋ ฌ ๊ธฐ๋ฐ˜ ์ง‘๊ณ„ Wide
Action count ์ „์ฒด ํ–‰ ์ˆ˜ ๋ฐ˜ํ™˜ ์‹คํ–‰ ํŠธ๋ฆฌ๊ฑฐ
first, head, take ์ผ๋ถ€ ํ–‰ ๋ฐ˜ํ™˜ ์‹คํ–‰ ํŠธ๋ฆฌ๊ฑฐ
collect ์ „์ฒด ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘ ์‹คํ–‰ ํŠธ๋ฆฌ๊ฑฐ
show ํ–‰ ์ถœ๋ ฅ(๋””๋ฒ„๊น…์šฉ) ์‹คํ–‰ ํŠธ๋ฆฌ๊ฑฐ
write.save, saveAsTable, jdbc.save ์ €์žฅ ์‹คํ–‰ ํŠธ๋ฆฌ๊ฑฐ
toPandas Pandas ๋ณ€ํ™˜ ์‹คํ–‰ ํŠธ๋ฆฌ๊ฑฐ

 

 

๋ฐ˜์‘ํ˜•

'๐Ÿ’ป Programming > Distributed Computing' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

[Ray] ๋ถ„์‚ฐ ์‹คํ–‰ ํ”„๋ ˆ์ž„์›Œํฌ Ray ์„ค๋ช…  (0) 2025.09.17
[PySpark] Spark Job ์‹คํ–‰ ๊ฐ€์ด๋“œ: Ad-hoc vs Batch  (0) 2025.09.17
[PySpark] ์„ฑ๋Šฅ ์ตœ์ ํ™” ๊ธฐ๋ณธ๊ธฐ: ๋Œ€๊ทœ๋ชจ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด  (0) 2025.09.16
[PySpark] ์–ด๋””์—, ์™œ ์“ธ๊นŒ? | ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ์™€ ML ํ™œ์šฉ ๊ฐ€์ด๋“œ  (0) 2025.09.16
[PySpark] ์ž์ฃผ ์“ฐ๋Š” ๊ธฐ๋Šฅ ๋ฉ”์„œ๋“œ ์ •๋ฆฌ  (0) 2025.05.12
'๐Ÿ’ป Programming/Distributed Computing' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€
  • [Ray] ๋ถ„์‚ฐ ์‹คํ–‰ ํ”„๋ ˆ์ž„์›Œํฌ Ray ์„ค๋ช…
  • [PySpark] Spark Job ์‹คํ–‰ ๊ฐ€์ด๋“œ: Ad-hoc vs Batch
  • [PySpark] ์„ฑ๋Šฅ ์ตœ์ ํ™” ๊ธฐ๋ณธ๊ธฐ: ๋Œ€๊ทœ๋ชจ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด
  • [PySpark] ์–ด๋””์—, ์™œ ์“ธ๊นŒ? | ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ์™€ ML ํ™œ์šฉ ๊ฐ€์ด๋“œ
๋ญ…์ฆค
๋ญ…์ฆค
AI ๊ธฐ์ˆ  ๋ธ”๋กœ๊ทธ
    ๋ฐ˜์‘ํ˜•
  • ๋ญ…์ฆค
    moovzi’s Doodle
    ๋ญ…์ฆค
  • ์ „์ฒด
    ์˜ค๋Š˜
    ์–ด์ œ
  • ๊ณต์ง€์‚ฌํ•ญ

    • โœจ About Me
    • ๋ถ„๋ฅ˜ ์ „์ฒด๋ณด๊ธฐ (216)
      • ๐Ÿ“– Fundamentals (34)
        • Computer Vision (9)
        • 3D vision & Graphics (6)
        • AI & ML (16)
        • etc. (3)
      • ๐Ÿ› Research (78)
        • Deep Learning (7)
        • Perception (19)
        • OCR (7)
        • Multi-modal (8)
        • Image•Video Generation (18)
        • 3D Vision (4)
        • Material • Texture Recognit.. (8)
        • Large-scale Model (7)
        • etc. (0)
      • ๐Ÿ› ๏ธ Engineering (8)
        • Distributed Training & Infe.. (5)
        • AI & ML ์ธ์‚ฌ์ดํŠธ (3)
      • ๐Ÿ’ป Programming (92)
        • Python (18)
        • Computer Vision (12)
        • LLM (4)
        • AI & ML (18)
        • Database (3)
        • Distributed Computing (6)
        • Apache Airflow (6)
        • Docker & Kubernetes (14)
        • ์ฝ”๋”ฉ ํ…Œ์ŠคํŠธ (4)
        • etc. (7)
      • ๐Ÿ’ฌ ETC (4)
        • ์ฑ… ๋ฆฌ๋ทฐ (4)
  • ๋งํฌ

    • ๋ฆฌํ‹€๋ฆฌ ํ”„๋กœํ•„ (๋ฉ˜ํ† ๋ง, ๋ฉด์ ‘์ฑ…,...)
    • ใ€Ž๋‚˜๋Š” AI ์—”์ง€๋‹ˆ์–ด์ž…๋‹ˆ๋‹คใ€
    • Instagram
    • Brunch
    • Github
  • ์ธ๊ธฐ ๊ธ€

  • ์ตœ๊ทผ ๋Œ“๊ธ€

  • ์ตœ๊ทผ ๊ธ€

  • hELLOยท Designed By์ •์ƒ์šฐ.v4.10.3
๋ญ…์ฆค
[PySpark] ์ฃผ์š” ์—ฐ์‚ฐ ๊ฐ€์ด๋“œ: Transformation, Action
์ƒ๋‹จ์œผ๋กœ

ํ‹ฐ์Šคํ† ๋ฆฌํˆด๋ฐ”