주니어 데이터 엔지니어 우솨's 개발일지

데이터 엔지니어링 72일차 TIL 본문

데브코스

데이터 엔지니어링 72일차 TIL

우솨 2024. 7. 9. 19:03

학습 내용

Partition Pruning과 Execution Plan
- Partition Pruning은 Logical Plan Optimization 단계에서 발생한다.


Dynamic Partition Pruning이란?
: 비 Partition 테이블에 적용된 필터링을 Partition 테이블에 적용해보는 것
        - 작은 dimension테이블이라면 브로드캐스트 조인까지하면 더 좋다.
- Dynamic Partition Pruning는 기본적으로 활성화 되어 있다.
        - spark.sql.optimizer.dynamicPartitionPruning.enavled : True


Repartition을 하는 이유
- 전체적으로 파티션의 수를 늘려 병렬성을 증가시키기 위해
- 굉장히 큰 파티션이나 Skew파티션의 크기를 조절하기 위해서
- 파티션을 분석 패턴에 맞게 재분배(Write once, read many)
        - 어떤 DataFrame을 특정 컬럼 기준으로 그룹핑을 하거나 필터링을 자주 하는경우
                - 미리 그 컬럼 기준으로 저장해두었다면 그게 Bucketing.

Repartition의 방식
- repartition / repartitionByRange 두가지 존재
- Shuffling이 발생하기 때문에 분명한 이유를 가지고 Repartition을 사용해야한다.
        - 많은 경우 repartition이 별 이유없이 사용되어 오히려 시간과 비용이 증가
        - 불필요한 counting과 distinct counting과 duplicate제거가 비용 발생 !
- Column이 사용되면 균등한 파티션 크기를 보장할 수 없다.
- 파티션의 수를 줄이는 용도로는 사용불가
        - 파티션의 수를 줄일 경우에는 Coalesce 사용

1. repartition(numPartitions, *cols)
: Hash 기반 Partitioning
ex) repartition(5), repartition(5, "city"), repartition(5, "city", "zipcode")

2. repartitionByRange(numPartitions, *cols)
- 지정된 컬럼 값의 범위를 기준으로 파티션을 나누는 방식
- 데이터 샘플링 기반으로 파티션을 나누기에 결과가 매번 달라질 수 있다.
        - Nondeterministic
- 사용법 자체는 repartiton과 동일하다

Coalesce
: 파티션의 수를 줄이는 용도
- Shuffling을 발생시키지 않고 로컬 파티션들을 머지함
        - Skew 파티션을 만들어낼 수 있다.
- Column이 사용되며 균등한 파티션 크기를 보장할 수 없다.

DataFrame 힌트
: Spark SQL Optimizer에게 Execution plan을 만듬에 있어 특정한 방식을 사용하도록 제안(최적화된 방식을 변경하기 위해 사용)
DataFrame의 Partitioning 관련 힌트
- COALESCE, REPARTITION, REPARTITION_BY_RANGE, REBALANCE
ex) df1.join(df2, "id", "inner").hint("COALESCE", 3)

DataFrame의 Join 관련 힌트
- BROADCAST, BROADCASTJOIN, MAPJOIN
        - Broadcast Join 사용 제안
- MERGE, SHUFFLE_MERGE, MERGEJOIN
        - Shuffle Merge join 사용 제안
        - Spark의 기본 조인 전략
- SHUFFLE_HASH
        - Shuffle Hash Join 사용 제안
        - Full Outer Join에는 사용 불가능
- SHUFFLE_REPLICATE_NL
        - Shuffle-and-replicate (Cross Join) Join 사용 제안
ex) SELECT /*+ MERGE(df2) */*
     FROM df1 JOIN df2 ON df1.order_month = df2.year_month

AQE(Adaptive Query Execution)
: Spark Engine Optimizer가 Partition의 수를 최적화하여 결정하는 것
- Spark 3.2부터 기본적으로 활성화 되어있다.
        - spark.sql.adaptive.enabled 옵션을 true로 설정
- AQE가 필요한 경우들
        - Dynamically coalescing(Post) shuffle partitions (Spark 3)
        - Dynamically switching join strategies (Spark 3.2)
        - Dynamically optimizing skew joins (Spark 3)

AQE가 해결하려는 일반적인 문제는 무엇인가?
: Partition의 수를 최적화하여 결정하는 것의 어려움

현재 AQE로 해결할 수 있는 구체적인 문제들 3가지로는 어떤 것들이 있는가?
1. 데이터 스큐(skew): 데이터가 특정 파티션에 집중되어 발생하는 성능 저하 문제.
2. 비효율적인 파티션 크기: 너무 작거나 큰 파티션으로 인한 I/O와 처리 비용 증가 문제.
3. 비최적화된 조인 전략: 데이터 크기나 분포를 고려하지 않은 고정된 조인 방식으로 인한 성능 저하 문제.


AQE의 동작 방식

 


Dynamically coalescing shuffle partitions 동작방식
: AQE의 해법
- 내부적으로 많은 수의 파티션을 일부러 생성
        - spark.sql.sdaptive.coalescePartitions.initialPartitionNum(200)
- 매 Stage가 종료될 때 필요하다면 자동으로 Coalesce 수행
        - spark.sql.adaptive.coalescePartitions.enabled
- 매 설정에 따라 파티션의 크기는 최소 크기 혹은 목표 크기를 맞추려 동작
        - spark.sql.adaptive.advisoryPartitionSizeInBytes
        - spark.sql.adaptive.coalescePartitions.minPartitionSize
        - 무엇을 사용할지는 spark.sql.adaptive.coalescePartitions.parallelismFirst에 의해 결정된다.



Dynamically coalescing shuffle partitions 관련 변수(Spark 3. 3. 1)


Dynamically switching join strategies는 무엇인가?
- 필요한 이유
        - Static Query Plan이 여러 이유로 BHJ(Broadcast Hash Join) 기회를 놓친 경우
                - 조인대상 DataFrame들에 대한 통계정보 부족(필터링 등)
                - UDF가 사용된 경우
- AQE의 해법
        - Runtime 통계정보를 바탕으로 조인 전략을 변경
                - stage들이 끝나고 조인되기 전에 다시 쿼리 플래닝을 수행
        - 두가지 옵션 존재
                - Broadcast Join(추천되며 우선순위를 가짐)
                - Shuffle Hash Join

Dynamically switching join strategies 동작방식



Dynamically switching join strategies 관련 환경변수(Spark 3.3.1)