[PySpark] ์ž์ฃผ ์“ฐ๋Š” ๊ธฐ๋Šฅ ๋ฉ”์„œ๋“œ ์ •๋ฆฌ

2025. 5. 12. 17:12ยท๐Ÿ’ป Programming/Distributed Computing
๋ฐ˜์‘ํ˜•

PySpark๋Š” ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃฐ ๋•Œ ํšจ๊ณผ์ ์ธ ๋ถ„์‚ฐ ์ฒ˜๋ฆฌ ํ”„๋ ˆ์ž„์›Œํฌ์ด๋‹ค. ์ด ๊ธ€์—์„œ๋Š” JSON ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ์™€ ๊ด€๋ จ๋œ PySpark์˜ ์ž์ฃผ ์“ฐ๋Š” ๊ธฐ๋Šฅ๊ณผ ๋ฉ”์„œ๋“œ๋ฅผ ์ค‘์‹ฌ์œผ๋กœ ์‹ค๋ฌด์—์„œ ์–ด๋–ป๊ฒŒ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋Š”์ง€ ์ •๋ฆฌํ•œ๋‹ค.


PySpark๋Š” ์™œ SQL์ด ์•„๋‹Œ ํ•จ์ˆ˜ํ˜• ์ฒ˜๋ฆฌ ๋ฐฉ์‹์œผ๋กœ ์‚ฌ์šฉํ•˜๋‚˜?

PySpark์—์„œ๋Š” SQL์ฒ˜๋Ÿผ ํ•œ ์ค„๋กœ ๋ณต์žกํ•œ ์ฟผ๋ฆฌ๋ฅผ ์ž‘์„ฑํ•˜๋Š” ๋Œ€์‹ , ํ…Œ์ด๋ธ”์„ ๋ถˆ๋Ÿฌ์™€ ๋‹จ๊ณ„์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹์ด ์ผ๋ฐ˜์ ์ด๋‹ค. ์ด๋Ÿฐ ๋ฐฉ์‹์€...

  • ๋ณต์žกํ•œ ๋กœ์ง์„ ๋ถ„๋ฆฌํ•ด์„œ ๋””๋ฒ„๊น…์ด ์šฉ์ดํ•˜๋‹ค.
  • ์ปฌ๋Ÿผ ๋‹จ์œ„ ์กฐ์ž‘์ด ์ž์œ ๋กญ๊ณ , ์žฌ์‚ฌ์šฉ์„ฑ์ด ๋†’๋‹ค.
  • ์ค‘๊ฐ„ ๋‹จ๊ณ„์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์‰ฝ๊ฒŒ ํ™•์ธํ•˜๊ณ  ํŠœ๋‹ํ•  ์ˆ˜ ์žˆ๋‹ค.

์™€ ๊ฐ™์€ ์žฅ์ ์ด ์žˆ๋‹ค๊ณ  ํ•œ๋‹ค.

๋”ฐ๋ผ์„œ ์‹ค๋ฌด์—์„œ๋Š” SQL ์ฟผ๋ฆฌ ํ•˜๋‚˜๋กœ ๋ชจ๋“  ์ž‘์—…์„ ๋๋‚ด๊ธฐ๋ณด๋‹ค๋Š”, ์•„๋ž˜์™€ ๊ฐ™์ด ๋‹จ๊ณ„๋ณ„ ์ฒ˜๋ฆฌ ๋ฐฉ์‹์œผ๋กœ ์ž‘์„ฑํ•˜๋Š” ์ฝ”๋“œ ์Šคํƒ€์ผ์ด ๋” ์„ ํ˜ธ๋œ๋‹ค.

# ์˜ˆ์‹œ: ์กฐํšŒ์ˆ˜ ํ•„ํ„ฐ → ๊ทธ๋ฃน ์ง‘๊ณ„ → ์ •๋ ฌ
df = spark.read.table("project_data")
filtered = df.filter(col("view_count") > 1000)
aggregated = filtered.groupBy("category").agg(avg("view_count").alias("avg_views"))
result = aggregated.orderBy(col("avg_views").desc())

 

PySpark ์‹ค๋ฌด์—์„œ ์ž์ฃผ ์‚ฌ์šฉํ•˜๋Š” ํ•ต์‹ฌ ๋ฉ”์„œ๋“œ ์ •๋ฆฌ

1. filter() - ์›ํ•˜๋Š” ์กฐ๊ฑด์˜ ๋ฐ์ดํ„ฐ๋งŒ ๋‚จ๊ธธ ๋•Œ

# ์กฐํšŒ์ˆ˜๊ฐ€ 1000 ์ด์ƒ์ธ ๋ฐ์ดํ„ฐ๋งŒ ๋‚จ๊ธฐ๊ธฐ
filtered_df = df.filter(col("view") >= 1000)
  • ๊ฐ€์žฅ ๋งŽ์ด ์‚ฌ์šฉํ•˜๋Š” ํ•จ์ˆ˜ ์ค‘ ํ•˜๋‚˜๋‹ค. SQL์˜ WHERE ์ ˆ๊ณผ ๋™์ผํ•˜๊ฒŒ ์ž‘๋™ํ•˜๋ฉฐ, null ์ œ๊ฑฐ, ์ž„๊ณ„๊ฐ’ ํ•„ํ„ฐ๋ง ๋“ฑ ๋ชจ๋“  ์กฐ๊ฑด ํ•„ํ„ฐ์— ์‚ฌ์šฉ๋œ๋‹ค.
  • filter()๋Š” ์—ฌ๋Ÿฌ ์กฐ๊ฑด์„ and/or๋กœ ์กฐํ•ฉํ•˜๊ฑฐ๋‚˜ .isNotNull() ๋“ฑ๊ณผ ํ•จ๊ป˜ ์ž์ฃผ ์‚ฌ์šฉ๋œ๋‹ค.

 

2. select() - ํ•„์š”ํ•œ ์ปฌ๋Ÿผ๋งŒ ๊ณจ๋ผ์„œ ์‚ฌ์šฉํ•˜๊ธฐ

selected_df = df.select("id", "title", col("view"))
  • ๋ถˆํ•„์š”ํ•œ ์ปฌ๋Ÿผ์„ ์ œ๊ฑฐํ•˜๊ณ , ์›ํ•˜๋Š” ์ปฌ๋Ÿผ๋งŒ ์ถ”์ถœํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋œ๋‹ค. ์กฐ์ธ ์ดํ›„ ์ปฌ๋Ÿผ ์ •๋ฆฌ ์‹œ ํŠนํžˆ ์œ ์šฉํ•˜๋‹ค.
  • ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ์„ ์ค„์ด๊ณ  .toPandas() ์ „์— ๊ผญ ํ•„์š”ํ•œ ์ปฌ๋Ÿผ๋งŒ ๊ฐ€์ ธ์˜ค๋Š” ์Šต๊ด€์ด ์ค‘์š”ํ•˜๋‹ค.

 

3. join() - ์—ฌ๋Ÿฌ ํ…Œ์ด๋ธ”์„ ์—ฐ๊ฒฐํ•  ๋•Œ

joined = df1.join(df2, df1.id == df2.foreign_id, "left")
  • ๊ด€๊ณ„ํ˜• ๋ฐ์ดํ„ฐ์—์„œ ๊ฐ€์žฅ ์ค‘์š”ํ•œ ๊ธฐ๋Šฅ์ด๋‹ค. Spark์—์„œ๋„ inner, left, right, outer ๋“ฑ ๋‹ค์–‘ํ•œ ๋ฐฉ์‹์œผ๋กœ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ์กฐ์ธ ์ „์— ์ปฌ๋Ÿผ๋ช…์ด ์ค‘๋ณต๋˜์ง€ ์•Š๋„๋ก alias()๋ฅผ ์‚ฌ์šฉํ•˜๊ฑฐ๋‚˜ .select()๋กœ ๋ช…์‹œ์ ์œผ๋กœ ์ปฌ๋Ÿผ์„ ์ง€์ •ํ•˜๋Š” ์Šต๊ด€์„ ๋“ค์ด์ž.

 

4. groupBy().agg() - ๊ทธ๋ฃน๋ณ„ ํ†ต๊ณ„ ์ง‘๊ณ„

from pyspark.sql.functions import avg, count, max, min, sum

# ์นดํ…Œ๊ณ ๋ฆฌ๋ณ„ ํ‰๊ท  ์กฐํšŒ์ˆ˜์™€ ์ด ์กฐํšŒ์ˆ˜ ๊ตฌํ•˜๊ธฐ
agg_df = df.groupBy("category").agg(
    count("id").alias("cnt"),
    avg("view_count").alias("avg_views"),
    sum("view_count").alias("total_views"),
    max("view_count").alias("max_views"),
    min("view_count").alias("min_views")
)
  • SQL์˜ GROUP BY ์ ˆ๊ณผ ๋™์ผํ•œ ์—ญํ• ์„ ํ•˜๋ฉฐ, ํŠน์ • ์ปฌ๋Ÿผ์„ ๊ธฐ์ค€์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๊ทธ๋ฃนํ™”ํ•œ ๋’ค ์ง‘๊ณ„ ์—ฐ์‚ฐ์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.
  • PySpark์—์„œ๋Š” .groupBy()๋กœ ๊ทธ๋ฃน์„ ์ง€์ •ํ•œ ํ›„, .agg() ๋‚ด๋ถ€์— ๋‹ค์–‘ํ•œ ์ง‘๊ณ„ ํ•จ์ˆ˜(avg, sum, count, min, max, ๋“ฑ)๋ฅผ ์ ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค
  • .agg()๋ฅผ ์“ฐ๋ฉด ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์ง‘๊ณ„ ํ•จ์ˆ˜๋ฅผ ํ•œ ๋ฒˆ์— ์ ์šฉํ•  ์ˆ˜ ์žˆ์–ด ํšจ์œจ์ ์ด๋‹ค.
  • .alias()๋ฅผ ์ด์šฉํ•ด ๊ฒฐ๊ณผ ์ปฌ๋Ÿผ ์ด๋ฆ„์„ ๋ช…ํ™•ํžˆ ์ง€์ •ํ•ด์ฃผ๋Š” ๊ฒƒ์ด ๊ฐ€๋…์„ฑ๊ณผ ํ›„์† ์ฒ˜๋ฆฌ์— ๋„์›€์ด ๋œ๋‹ค.
  • groupBy().count()์ฒ˜๋Ÿผ ๋‹จ์ผ ์ง‘๊ณ„ ํ•จ์ˆ˜๋Š” ์ถ•์•ฝํ˜•๋„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค

 

5. withColumn() - ์ปฌ๋Ÿผ ์ƒ์„ฑ ๋˜๋Š” ์ˆ˜์ •

from pyspark.sql.functions import year

with_year = df.withColumn("year", year("created_at"))
  • ๊ธฐ์กด ์ปฌ๋Ÿผ์„ ์ˆ˜์ •ํ•˜๊ฑฐ๋‚˜ ์ƒˆ๋กœ์šด ์ปฌ๋Ÿผ์„ ์ƒ์„ฑํ•  ๋•Œ ์‚ฌ์šฉ๋œ๋‹ค. ๋‚ ์งœ ์ฒ˜๋ฆฌ, ์กฐ๊ฑด ์ปฌ๋Ÿผ ์ƒ์„ฑ ๋“ฑ ๋‹ค์–‘ํ•œ ์šฉ๋„๋กœ ํ™œ์šฉ ๊ฐ€๋Šฅํ•˜๋‹ค.
  • ์ƒˆ๋กœ์šด ์ปฌ๋Ÿผ์ด ๊ธฐ์กด ์ปฌ๋Ÿผ ๊ธฐ๋ฐ˜์œผ๋กœ ํŒŒ์ƒ๋˜๋Š” ๊ฒฝ์šฐ ๊ฑฐ์˜ ํ•ญ์ƒ withColumn()์„ ์‚ฌ์šฉํ•œ๋‹ค.

 

6. explode() - ๋ฐฐ์—ด ์ปฌ๋Ÿผ์„ ํ–‰์œผ๋กœ ํŽผ์น˜๊ธฐ

from pyspark.sql.functions import explode

exploded_df = df.select(
    col("image_id"),
    explode("annos").alias("annos")
)
  • explode()๋Š” ๋ฐฐ์—ด ์ปฌ๋Ÿผ์„ ํ•˜๋‚˜์”ฉ ๋ถ„ํ•ดํ•˜์—ฌ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ํ–‰์œผ๋กœ ํŽผ์ณ์ฃผ๋Š” ํ•จ์ˆ˜์ด๋‹ค.
  • ํŠนํžˆ JSON ๊ตฌ์กฐ ์•ˆ์— ๋ฆฌ์ŠคํŠธ๊ฐ€ ์ค‘์ฒฉ๋˜์–ด ์žˆ๋Š” ๊ฒฝ์šฐ, ๋ฐ์ดํ„ฐ๋ฅผ ์„ธ๋ถ„ํ™”ํ•ด์„œ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•œ๋‹ค.
  • ์œ„ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•˜๋ฉด, ์›๋ž˜ ํ•œ ํ–‰์— ์—ฌ๋Ÿฌ ๊ฐœ์˜ annos๊ฐ€ ๋“ค์–ด ์žˆ๋˜ ๋ฐ์ดํ„ฐ๊ฐ€, annos ํ•˜๋‚˜๋‹น ํ•œ ํ–‰์”ฉ ๋ถ„๋ฆฌ๋œ ํ˜•ํƒœ๋กœ ๋ฐ˜ํ™˜๋œ๋‹ค.

 

7. toPandas() - Pandas ๋ณ€ํ™˜ 

small_df = df.limit(1000)
pandas_df = small_df.toPandas()
  • Spark DataFrame์„ Pandas DataFrame์œผ๋กœ ๋ณ€ํ™˜ํ•  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค. ๋‹จ, ๋ฐ์ดํ„ฐ๋Ÿ‰์ด ๋งŽ์œผ๋ฉด OOM์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ ์ฃผ์˜๊ฐ€ ํ•„์š”ํ•˜๋‹ค.
  • ๋ฐ˜๋“œ์‹œ .limit() ๋˜๋Š” .sample()๋กœ ํฌ๊ธฐ๋ฅผ ์ œ์–ดํ•˜๊ณ  ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ์•ˆ์ „ํ•˜๋‹ค.
๋ฐ˜์‘ํ˜•

'๐Ÿ’ป Programming > Distributed Computing' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

[Ray] ๋ถ„์‚ฐ ์‹คํ–‰ ํ”„๋ ˆ์ž„์›Œํฌ Ray ์„ค๋ช…  (0) 2025.09.17
[PySpark] Spark Job ์‹คํ–‰ ๊ฐ€์ด๋“œ: Ad-hoc vs Batch  (0) 2025.09.17
[PySpark] ์ฃผ์š” ์—ฐ์‚ฐ ๊ฐ€์ด๋“œ: Transformation, Action  (0) 2025.09.16
[PySpark] ์„ฑ๋Šฅ ์ตœ์ ํ™” ๊ธฐ๋ณธ๊ธฐ: ๋Œ€๊ทœ๋ชจ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด  (0) 2025.09.16
[PySpark] ์–ด๋””์—, ์™œ ์“ธ๊นŒ? | ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ์™€ ML ํ™œ์šฉ ๊ฐ€์ด๋“œ  (0) 2025.09.16
'๐Ÿ’ป Programming/Distributed Computing' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€
  • [PySpark] Spark Job ์‹คํ–‰ ๊ฐ€์ด๋“œ: Ad-hoc vs Batch
  • [PySpark] ์ฃผ์š” ์—ฐ์‚ฐ ๊ฐ€์ด๋“œ: Transformation, Action
  • [PySpark] ์„ฑ๋Šฅ ์ตœ์ ํ™” ๊ธฐ๋ณธ๊ธฐ: ๋Œ€๊ทœ๋ชจ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด
  • [PySpark] ์–ด๋””์—, ์™œ ์“ธ๊นŒ? | ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ์™€ ML ํ™œ์šฉ ๊ฐ€์ด๋“œ
๋ญ…์ฆค
๋ญ…์ฆค
AI ๊ธฐ์ˆ  ๋ธ”๋กœ๊ทธ
    ๋ฐ˜์‘ํ˜•
  • ๋ญ…์ฆค
    moovzi’s Doodle
    ๋ญ…์ฆค
  • ์ „์ฒด
    ์˜ค๋Š˜
    ์–ด์ œ
  • ๊ณต์ง€์‚ฌํ•ญ

    • โœจ About Me
    • ๋ถ„๋ฅ˜ ์ „์ฒด๋ณด๊ธฐ (216)
      • ๐Ÿ“– Fundamentals (34)
        • Computer Vision (9)
        • 3D vision & Graphics (6)
        • AI & ML (16)
        • etc. (3)
      • ๐Ÿ› Research (78)
        • Deep Learning (7)
        • Perception (19)
        • OCR (7)
        • Multi-modal (8)
        • Image•Video Generation (18)
        • 3D Vision (4)
        • Material • Texture Recognit.. (8)
        • Large-scale Model (7)
        • etc. (0)
      • ๐Ÿ› ๏ธ Engineering (8)
        • Distributed Training & Infe.. (5)
        • AI & ML ์ธ์‚ฌ์ดํŠธ (3)
      • ๐Ÿ’ป Programming (92)
        • Python (18)
        • Computer Vision (12)
        • LLM (4)
        • AI & ML (18)
        • Database (3)
        • Distributed Computing (6)
        • Apache Airflow (6)
        • Docker & Kubernetes (14)
        • ์ฝ”๋”ฉ ํ…Œ์ŠคํŠธ (4)
        • etc. (7)
      • ๐Ÿ’ฌ ETC (4)
        • ์ฑ… ๋ฆฌ๋ทฐ (4)
  • ๋งํฌ

    • ๋ฆฌํ‹€๋ฆฌ ํ”„๋กœํ•„ (๋ฉ˜ํ† ๋ง, ๋ฉด์ ‘์ฑ…,...)
    • ใ€Ž๋‚˜๋Š” AI ์—”์ง€๋‹ˆ์–ด์ž…๋‹ˆ๋‹คใ€
    • Instagram
    • Brunch
    • Github
  • ์ธ๊ธฐ ๊ธ€

  • ์ตœ๊ทผ ๋Œ“๊ธ€

  • ์ตœ๊ทผ ๊ธ€

  • hELLOยท Designed By์ •์ƒ์šฐ.v4.10.3
๋ญ…์ฆค
[PySpark] ์ž์ฃผ ์“ฐ๋Š” ๊ธฐ๋Šฅ ๋ฉ”์„œ๋“œ ์ •๋ฆฌ
์ƒ๋‹จ์œผ๋กœ

ํ‹ฐ์Šคํ† ๋ฆฌํˆด๋ฐ”