๋ฐ์ํ

PySpark๋ ๋์ฉ๋ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃฐ ๋ ํจ๊ณผ์ ์ธ ๋ถ์ฐ ์ฒ๋ฆฌ ํ๋ ์์ํฌ์ด๋ค. ์ด ๊ธ์์๋ JSON ๋ฐ์ดํฐ ์ฒ๋ฆฌ์ ๊ด๋ จ๋ PySpark์ ์์ฃผ ์ฐ๋ ๊ธฐ๋ฅ๊ณผ ๋ฉ์๋๋ฅผ ์ค์ฌ์ผ๋ก ์ค๋ฌด์์ ์ด๋ป๊ฒ ํ์ฉํ ์ ์๋์ง ์ ๋ฆฌํ๋ค.
PySpark๋ ์ SQL์ด ์๋ ํจ์ํ ์ฒ๋ฆฌ ๋ฐฉ์์ผ๋ก ์ฌ์ฉํ๋?
PySpark์์๋ SQL์ฒ๋ผ ํ ์ค๋ก ๋ณต์กํ ์ฟผ๋ฆฌ๋ฅผ ์์ฑํ๋ ๋์ , ํ ์ด๋ธ์ ๋ถ๋ฌ์ ๋จ๊ณ์ ์ผ๋ก ์ฒ๋ฆฌํ๋ ๋ฐฉ์์ด ์ผ๋ฐ์ ์ด๋ค. ์ด๋ฐ ๋ฐฉ์์...
- ๋ณต์กํ ๋ก์ง์ ๋ถ๋ฆฌํด์ ๋๋ฒ๊น ์ด ์ฉ์ดํ๋ค.
- ์ปฌ๋ผ ๋จ์ ์กฐ์์ด ์์ ๋กญ๊ณ , ์ฌ์ฌ์ฉ์ฑ์ด ๋๋ค.
- ์ค๊ฐ ๋จ๊ณ์ ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ฒ ํ์ธํ๊ณ ํ๋ํ ์ ์๋ค.
์ ๊ฐ์ ์ฅ์ ์ด ์๋ค๊ณ ํ๋ค.
๋ฐ๋ผ์ ์ค๋ฌด์์๋ SQL ์ฟผ๋ฆฌ ํ๋๋ก ๋ชจ๋ ์์ ์ ๋๋ด๊ธฐ๋ณด๋ค๋, ์๋์ ๊ฐ์ด ๋จ๊ณ๋ณ ์ฒ๋ฆฌ ๋ฐฉ์์ผ๋ก ์์ฑํ๋ ์ฝ๋ ์คํ์ผ์ด ๋ ์ ํธ๋๋ค.
# ์์: ์กฐํ์ ํํฐ → ๊ทธ๋ฃน ์ง๊ณ → ์ ๋ ฌ
df = spark.read.table("project_data")
filtered = df.filter(col("view_count") > 1000)
aggregated = filtered.groupBy("category").agg(avg("view_count").alias("avg_views"))
result = aggregated.orderBy(col("avg_views").desc())
PySpark ์ค๋ฌด์์ ์์ฃผ ์ฌ์ฉํ๋ ํต์ฌ ๋ฉ์๋ ์ ๋ฆฌ
1. filter() - ์ํ๋ ์กฐ๊ฑด์ ๋ฐ์ดํฐ๋ง ๋จ๊ธธ ๋
# ์กฐํ์๊ฐ 1000 ์ด์์ธ ๋ฐ์ดํฐ๋ง ๋จ๊ธฐ๊ธฐ
filtered_df = df.filter(col("view") >= 1000)
- ๊ฐ์ฅ ๋ง์ด ์ฌ์ฉํ๋ ํจ์ ์ค ํ๋๋ค. SQL์ WHERE ์ ๊ณผ ๋์ผํ๊ฒ ์๋ํ๋ฉฐ, null ์ ๊ฑฐ, ์๊ณ๊ฐ ํํฐ๋ง ๋ฑ ๋ชจ๋ ์กฐ๊ฑด ํํฐ์ ์ฌ์ฉ๋๋ค.
- filter()๋ ์ฌ๋ฌ ์กฐ๊ฑด์ and/or๋ก ์กฐํฉํ๊ฑฐ๋ .isNotNull() ๋ฑ๊ณผ ํจ๊ป ์์ฃผ ์ฌ์ฉ๋๋ค.
2. select() - ํ์ํ ์ปฌ๋ผ๋ง ๊ณจ๋ผ์ ์ฌ์ฉํ๊ธฐ
selected_df = df.select("id", "title", col("view"))
- ๋ถํ์ํ ์ปฌ๋ผ์ ์ ๊ฑฐํ๊ณ , ์ํ๋ ์ปฌ๋ผ๋ง ์ถ์ถํ๋ ๋ฐ ์ฌ์ฉ๋๋ค. ์กฐ์ธ ์ดํ ์ปฌ๋ผ ์ ๋ฆฌ ์ ํนํ ์ ์ฉํ๋ค.
- ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ์ ์ค์ด๊ณ .toPandas() ์ ์ ๊ผญ ํ์ํ ์ปฌ๋ผ๋ง ๊ฐ์ ธ์ค๋ ์ต๊ด์ด ์ค์ํ๋ค.
3. join() - ์ฌ๋ฌ ํ ์ด๋ธ์ ์ฐ๊ฒฐํ ๋
joined = df1.join(df2, df1.id == df2.foreign_id, "left")
- ๊ด๊ณํ ๋ฐ์ดํฐ์์ ๊ฐ์ฅ ์ค์ํ ๊ธฐ๋ฅ์ด๋ค. Spark์์๋ inner, left, right, outer ๋ฑ ๋ค์ํ ๋ฐฉ์์ผ๋ก ์ฌ์ฉํ ์ ์๋ค.
- ์กฐ์ธ ์ ์ ์ปฌ๋ผ๋ช ์ด ์ค๋ณต๋์ง ์๋๋ก alias()๋ฅผ ์ฌ์ฉํ๊ฑฐ๋ .select()๋ก ๋ช ์์ ์ผ๋ก ์ปฌ๋ผ์ ์ง์ ํ๋ ์ต๊ด์ ๋ค์ด์.
4. groupBy().agg() - ๊ทธ๋ฃน๋ณ ํต๊ณ ์ง๊ณ
from pyspark.sql.functions import avg, count, max, min, sum
# ์นดํ
๊ณ ๋ฆฌ๋ณ ํ๊ท ์กฐํ์์ ์ด ์กฐํ์ ๊ตฌํ๊ธฐ
agg_df = df.groupBy("category").agg(
count("id").alias("cnt"),
avg("view_count").alias("avg_views"),
sum("view_count").alias("total_views"),
max("view_count").alias("max_views"),
min("view_count").alias("min_views")
)
- SQL์ GROUP BY ์ ๊ณผ ๋์ผํ ์ญํ ์ ํ๋ฉฐ, ํน์ ์ปฌ๋ผ์ ๊ธฐ์ค์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๊ทธ๋ฃนํํ ๋ค ์ง๊ณ ์ฐ์ฐ์ ์ํํ ์ ์๋ค.
- PySpark์์๋ .groupBy()๋ก ๊ทธ๋ฃน์ ์ง์ ํ ํ, .agg() ๋ด๋ถ์ ๋ค์ํ ์ง๊ณ ํจ์(avg, sum, count, min, max, ๋ฑ)๋ฅผ ์ ์ฉํ ์ ์๋ค
- .agg()๋ฅผ ์ฐ๋ฉด ์ฌ๋ฌ ๊ฐ์ ์ง๊ณ ํจ์๋ฅผ ํ ๋ฒ์ ์ ์ฉํ ์ ์์ด ํจ์จ์ ์ด๋ค.
- .alias()๋ฅผ ์ด์ฉํด ๊ฒฐ๊ณผ ์ปฌ๋ผ ์ด๋ฆ์ ๋ช ํํ ์ง์ ํด์ฃผ๋ ๊ฒ์ด ๊ฐ๋ ์ฑ๊ณผ ํ์ ์ฒ๋ฆฌ์ ๋์์ด ๋๋ค.
- groupBy().count()์ฒ๋ผ ๋จ์ผ ์ง๊ณ ํจ์๋ ์ถ์ฝํ๋ ์ฌ์ฉํ ์ ์๋ค
5. withColumn() - ์ปฌ๋ผ ์์ฑ ๋๋ ์์
from pyspark.sql.functions import year
with_year = df.withColumn("year", year("created_at"))
- ๊ธฐ์กด ์ปฌ๋ผ์ ์์ ํ๊ฑฐ๋ ์๋ก์ด ์ปฌ๋ผ์ ์์ฑํ ๋ ์ฌ์ฉ๋๋ค. ๋ ์ง ์ฒ๋ฆฌ, ์กฐ๊ฑด ์ปฌ๋ผ ์์ฑ ๋ฑ ๋ค์ํ ์ฉ๋๋ก ํ์ฉ ๊ฐ๋ฅํ๋ค.
- ์๋ก์ด ์ปฌ๋ผ์ด ๊ธฐ์กด ์ปฌ๋ผ ๊ธฐ๋ฐ์ผ๋ก ํ์๋๋ ๊ฒฝ์ฐ ๊ฑฐ์ ํญ์ withColumn()์ ์ฌ์ฉํ๋ค.
6. explode() - ๋ฐฐ์ด ์ปฌ๋ผ์ ํ์ผ๋ก ํผ์น๊ธฐ
from pyspark.sql.functions import explode
exploded_df = df.select(
col("image_id"),
explode("annos").alias("annos")
)
- explode()๋ ๋ฐฐ์ด ์ปฌ๋ผ์ ํ๋์ฉ ๋ถํดํ์ฌ ์ฌ๋ฌ ๊ฐ์ ํ์ผ๋ก ํผ์ณ์ฃผ๋ ํจ์์ด๋ค.
- ํนํ JSON ๊ตฌ์กฐ ์์ ๋ฆฌ์คํธ๊ฐ ์ค์ฒฉ๋์ด ์๋ ๊ฒฝ์ฐ, ๋ฐ์ดํฐ๋ฅผ ์ธ๋ถํํด์ ์ฒ๋ฆฌํ ์ ์๊ฒ ํ๋ค.
- ์ ์ฝ๋๋ฅผ ์คํํ๋ฉด, ์๋ ํ ํ์ ์ฌ๋ฌ ๊ฐ์ annos๊ฐ ๋ค์ด ์๋ ๋ฐ์ดํฐ๊ฐ, annos ํ๋๋น ํ ํ์ฉ ๋ถ๋ฆฌ๋ ํํ๋ก ๋ฐํ๋๋ค.
7. toPandas() - Pandas ๋ณํ
small_df = df.limit(1000)
pandas_df = small_df.toPandas()
- Spark DataFrame์ Pandas DataFrame์ผ๋ก ๋ณํํ ๋ ์ฌ์ฉํ๋ค. ๋จ, ๋ฐ์ดํฐ๋์ด ๋ง์ผ๋ฉด OOM์ด ๋ฐ์ํ ์ ์์ผ๋ฏ๋ก ์ฃผ์๊ฐ ํ์ํ๋ค.
- ๋ฐ๋์ .limit() ๋๋ .sample()๋ก ํฌ๊ธฐ๋ฅผ ์ ์ดํ๊ณ ์ฌ์ฉํ๋ ๊ฒ์ด ์์ ํ๋ค.
๋ฐ์ํ
'๐ป Programming > Distributed Computing' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
| [Ray] ๋ถ์ฐ ์คํ ํ๋ ์์ํฌ Ray ์ค๋ช (0) | 2025.09.17 |
|---|---|
| [PySpark] Spark Job ์คํ ๊ฐ์ด๋: Ad-hoc vs Batch (0) | 2025.09.17 |
| [PySpark] ์ฃผ์ ์ฐ์ฐ ๊ฐ์ด๋: Transformation, Action (0) | 2025.09.16 |
| [PySpark] ์ฑ๋ฅ ์ต์ ํ ๊ธฐ๋ณธ๊ธฐ: ๋๊ท๋ชจ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ํด (0) | 2025.09.16 |
| [PySpark] ์ด๋์, ์ ์ธ๊น? | ๋์ฉ๋ ๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ์ ML ํ์ฉ ๊ฐ์ด๋ (0) | 2025.09.16 |