반응형
PySpark는 대용량 데이터를 다룰 때 효과적인 분산 처리 프레임워크이다. 이 글에서는 JSON 데이터 처리와 관련된 PySpark의 자주 쓰는 기능과 메서드를 중심으로 실무에서 어떻게 활용할 수 있는지 정리한다.
PySpark는 왜 SQL이 아닌 함수형 처리 방식으로 사용하나?
PySpark에서는 SQL처럼 한 줄로 복잡한 쿼리를 작성하는 대신, 테이블을 불러와 단계적으로 처리하는 방식이 일반적이다. 이런 방식은...
- 복잡한 로직을 분리해서 디버깅이 용이하다.
- 컬럼 단위 조작이 자유롭고, 재사용성이 높다.
- 중간 단계의 데이터를 쉽게 확인하고 튜닝할 수 있다.
와 같은 장점이 있다고 한다.
따라서 실무에서는 SQL 쿼리 하나로 모든 작업을 끝내기보다는, 아래와 같이 단계별 처리 방식으로 작성하는 코드 스타일이 더 선호된다.
# 예시: 조회수 필터 → 그룹 집계 → 정렬
df = spark.read.table("project_data")
filtered = df.filter(col("view_count") > 1000)
aggregated = filtered.groupBy("category").agg(avg("view_count").alias("avg_views"))
result = aggregated.orderBy(col("avg_views").desc())
PySpark 실무에서 자주 사용하는 핵심 메서드 정리
1. filter() - 원하는 조건의 데이터만 남길 때
# 조회수가 1000 이상인 데이터만 남기기
filtered_df = df.filter(col("view") >= 1000)
- 가장 많이 사용하는 함수 중 하나다. SQL의 WHERE 절과 동일하게 작동하며, null 제거, 임계값 필터링 등 모든 조건 필터에 사용된다.
- filter()는 여러 조건을 and/or로 조합하거나 .isNotNull() 등과 함께 자주 사용된다.
2. select() - 필요한 컬럼만 골라서 사용하기
selected_df = df.select("id", "title", col("view"))
- 불필요한 컬럼을 제거하고, 원하는 컬럼만 추출하는 데 사용된다. 조인 이후 컬럼 정리 시 특히 유용하다.
- 메모리 사용을 줄이고 .toPandas() 전에 꼭 필요한 컬럼만 가져오는 습관이 중요하다.
3. join() - 여러 테이블을 연결할 때
joined = df1.join(df2, df1.id == df2.foreign_id, "left")
- 관계형 데이터에서 가장 중요한 기능이다. Spark에서도 inner, left, right, outer 등 다양한 방식으로 사용할 수 있다.
- 조인 전에 컬럼명이 중복되지 않도록 alias()를 사용하거나 .select()로 명시적으로 컬럼을 지정하는 습관을 들이자.
4. groupBy().agg() - 그룹별 통계 집계
from pyspark.sql.functions import avg, count, max, min, sum
# 카테고리별 평균 조회수와 총 조회수 구하기
agg_df = df.groupBy("category").agg(
count("id").alias("cnt"),
avg("view_count").alias("avg_views"),
sum("view_count").alias("total_views"),
max("view_count").alias("max_views"),
min("view_count").alias("min_views")
)
- SQL의 GROUP BY 절과 동일한 역할을 하며, 특정 컬럼을 기준으로 데이터를 그룹화한 뒤 집계 연산을 수행할 수 있다.
- PySpark에서는 .groupBy()로 그룹을 지정한 후, .agg() 내부에 다양한 집계 함수(avg, sum, count, min, max, 등)를 적용할 수 있다
- .agg()를 쓰면 여러 개의 집계 함수를 한 번에 적용할 수 있어 효율적이다.
- .alias()를 이용해 결과 컬럼 이름을 명확히 지정해주는 것이 가독성과 후속 처리에 도움이 된다.
- groupBy().count()처럼 단일 집계 함수는 축약형도 사용할 수 있다
5. withColumn() - 컬럼 생성 또는 수정
from pyspark.sql.functions import year
with_year = df.withColumn("year", year("created_at"))
- 기존 컬럼을 수정하거나 새로운 컬럼을 생성할 때 사용된다. 날짜 처리, 조건 컬럼 생성 등 다양한 용도로 활용 가능하다.
- 새로운 컬럼이 기존 컬럼 기반으로 파생되는 경우 거의 항상 withColumn()을 사용한다.
6. explode() - 배열 컬럼을 행으로 펼치기
from pyspark.sql.functions import explode
exploded_df = df.select(
col("image_id"),
explode("annos").alias("annos")
)
- explode()는 배열 컬럼을 하나씩 분해하여 여러 개의 행으로 펼쳐주는 함수이다.
- 특히 JSON 구조 안에 리스트가 중첩되어 있는 경우, 데이터를 세분화해서 처리할 수 있게 한다.
- 위 코드를 실행하면, 원래 한 행에 여러 개의 annos가 들어 있던 데이터가, annos 하나당 한 행씩 분리된 형태로 반환된다.
7. toPandas() - Pandas 변환
small_df = df.limit(1000)
pandas_df = small_df.toPandas()
- Spark DataFrame을 Pandas DataFrame으로 변환할 때 사용한다. 단, 데이터량이 많으면 OOM이 발생할 수 있으므로 주의가 필요하다.
- 반드시 .limit() 또는 .sample()로 크기를 제어하고 사용하는 것이 안전하다.
반응형