주니어 데이터 엔지니어 우솨's 개발일지

데이터 엔지니어링 73일차 TIL 본문

데브코스

데이터 엔지니어링 73일차 TIL

우솨 2024. 7. 10. 13:41

학습 내용

Dynamically optimizing skew joins
: Skew된 조인을 효율적으로 처리하는 방법
Skew 파티션으로 인한 성능 문제를 해결하기 위함
        - 한 두개의 오래 걸리는 태스크들로 인한 전체 Job/Stage 종료 지연
        - disk spill이 발생한다면 더 느려지게 된다.
- AQE에서의 해법
        - skew 파티션의 존재 여부 파악
        - skew 파티션을 작게 나누고 상대 조인 파티션을 중복하여 만들고 조인 수행

Dynamically optimizing skew joins 동작방식




Dynamically optimizing skew joins 환경변수(Spark 3.3.1)



파티션 관련 환경변수(Spark 3.3.1)
- spark.sql.shuffle.partitions
        - 클러스터 자원과 처리 데이터의 크기를 고려해서 job마다 바꿔서 설정
        - 큰 데이터를 처리하는 거라면 클러스터 전체 코어의 수로 설정
        - AQE를 사용하는 관점에서는 조금 더 크게 잡는 것이 좋다.



Salting이란?
: Skew Partition을 처리하기 위한 테크닉
        - AQE의 등장으로 많이 쓰이지는 않음
        - AQE만으로 skew partition 이슈가 사라지지 않는다면 사용
- 랜덤 필드를 만들고 그 기준으로 파티션을 새로 만들어서 처리
        - Aggregation 처리의 경우 효과적
        - Join의 경우 효과적이지 않다.
                - AQE에 의존하는 것이 좋다
                - Join시 사용 Salting 테크닉을 일반화한 것이 AQE의 skew JOIN처리방식이다.



Spill이란?
: 파티션의 크기가 너무 커서 메모리가 부족한 경우 그 여분을 디스크에 쓰는 것
        - Spill이 발생하면 실행시간이 늘어나고 OOM이 발생할 가능성이 올라간다.
- Spill이 발생하는 경우(메모리가 부족해지는 경우)
        - Skew Partition 대상 Aggregation
        - Skew Partition 대상 Join
        - 굉장히 큰 explode 작업
        - 큰 파티션이라면 위의 작업시 spill가능성이 더 높아진다.

Spill의 종류
- Memory
        - 디스크로 spill된 데이터가 메모리에 있으르 때의 크기
        - Deserialized 형태라 크기가 보통 8~10배 정도 더 크다.
- Disk
        - 메모리에서 spill된 데이터가 디스크에서 차지하는 크기
        - Serialized 형태라 보통 크기가 훨씬 더 작다.

Partition의 종류 (Life cycle of Partitons)
: 파티션의 크기는 128MB ~ 1GB가 좋다.
a. 입력 데이터를 로드할 때 파티션 수와 크기
- maxSplitBytes 결정공식
        - bytesPerCore = (데이터 파일들 전체크기 + 파일수 * OpenCostInBytes) / default.parallelism
        - maxSplitBytes = Min(maxPartitonBytes, bytesPerCore)
- 파티션 생성 방법
        - Splittable한 파일은 maxSplitBytes 단위로 분할하여 File Chunk 생성.
        - Splittable하지 않거나 작은 파일은 하나의 File Chunk로 생성.
        - 여러 File Chunk를 하나의 파티션으로 병합, maxSplitBytes를 넘지 않도록 설정.

b. 셔플링 후 만들어지는 파티션 수와 크기
- spark.sql.shuffle.partitions
        - 기본값: 200
        - 클러스터 자원과 데이터 크기를 고려하여 설정.
        - 큰 데이터를 처리할 경우 더 큰 값으로 설정, AQE를 사용하는 경우 더 크게 설정.

c. 데이터를 최종적으로 저장할 때 파티션 수와 크기
- bucketBy 사용:
        - 특정 ID 기준으로 데이터를 나눠 저장.
        - ex) df.write.mode("overwrite").bucketBy(3, "key").saveAsTable("table")
- partitionBy 사용:
        - 데이터 생성 시간 등 특정 기준으로 폴더 구조로 저장.
        - ex) df.write.mode("overwrite").partitionBy("order_month").saveAsTable("order")
- bucketBy와 partitionBy 혼용:
        - 필터링 기준으로 partitionBy 후, 그룹핑/조인 기준으로 bucketBy 사용.
        - ex) df.write.mode("overwrite").partitionBy("dept").\

                bucketBy(5, "employeeId").saveAsTable("table")

입력 데이터를 로드할 때 파티션 수와 크기
- 기본적으로 파티션의 최대 크기에 의해 결정된다.
        - spark.sql.files.maxpartitonbytes (128MB)
- 해당 데이터가 어떻게 저장되었는지와 연관이 많이 된다.
        - 파일포맷이 무엇이고 압축되었는지? 압축되었다면 무슨 알고리즘으로?
                - 결국 Splittable 한가? =>하나의 큰 파일을 다수의 파티션으로 나눠 로드할 수 있는가?
        - 기타 관련 Spark 환경변수들



입력 데이터 파티션 수와 크기를 결정해주는 변수들
- bucketBy로 저장된 데이터를 읽는 경우
        - Bucket의 수와 Bucket 기준 컬럼들과 정렬 기준 컬럼들
- 읽어들이는 데이터 파일이 splittable한지?
        - PARQUET/AVRO등이 좋은 이유 : 항상 splittable하다
        - JSON/CSV 등의 경우 한 레코드가 multi-line이라면 splittable하지 않음.
                - Single line이라도 압축시 bzip2를 사용해야만 splittable하다.
- 입력 데이터의 전체 크기(모든 파일크기의 합)
- 리소스 매니저에게 요청한 CPU의 수

입력 데이터 파티션 수와 크기 결정 방식
- maxSplitBytes 결정공식
        - bytesPerCore = (데이터 파일들 전체크기 + 파일수 * OpenCostInBytes) / default.parallelism
        - maxSplitBytes = Min(maxPartitonBytes, bytesPerCore)
- 입력 데이터를 구성하는 각 파일에 대해 다음을 진행
        - Splittable하다면 maxSplitBytes 단위로 분할하여 File Chunk 생성
        - Splittable하지 않거나 크기가 maxSplitBytes보다 작다면 하나의 File chunk생성
- 위에서 만들어진 File Chunk들로부터 파티션 생성
        - 기본적으로 하나의 파티션은 하나 혹은 그 이상의 File Chunk들로 구성
        - 한 파티션에 다음 File Chunk의 크기 + openCostInBytes를 더했을 때의 값이 maxSplitBytes를 넘어가지 않을 때까지 계속해서 머지함(파일들이 하나의 파티션으로 패킹된다.)
=> 코어별로 처리해야할 크기가 너무 크다면 maxPartitonBytes로 제약
=> 데이터의 크기가 적당하면 코어수 만큼의 파티션을 생성
=> 데이터의 크기가 너무 작으면 openCostInBytes로 설정한 크기로 파티션 생성

Bucketing(bucketBy)
: 데이터를 자주 사용되는 컬럼 기주능로 미리 저장해두고 활용
- 다양한 최적화 기능
        - 조인 대상 테이블들이 조인 키를 갖고 bucketing된 경우 shuffle free join가능
        - 한쪽만 bucketing되어있는 경우 one-side shuffle free join 가능(bucket의 크기에 달려있다.)
        - Bucket pruinng을 통한 최적화 가능
        - Shuffle free aggregation
- Bucket 정보가 metastore에 저장되고 Spark Compiler는 이를 활용
        - sortBy를 통해 순서를 미리 정해주기도 한다.
        - Spark 테이블로 저장하고 로딩해야지만 이 정보를 이용 가능함.
                - saveAsTable로 저장, spark.table()로 로딩

Bucketing(bucketBy) 저장방식
- Bucket의 수 * Partition의 수 만큼의 파일이 만들어진다.
        - ex) DataFrame의 Partiton 수가 10이고 Bucket의 수가 4라면 40개의 파일 생성
        - 다시 읽어들일 때 10개의 Partition으로 읽혀짐
- 다시 읽어들일 때 원래의 Partiton의 수만큼으로 재구성된다.
- Bucketing 키를 기반으로 작업시 셔플이 없어짐

Small Files 신드롬이란?
: 작은 크기의 많은 파일이 문제가 되는 것
- 64MB의 파일 하나를 읽는 것 vs 64Byte의 파일 백만개를 읽는 것
- API콜은 모두 네트워크 RPC 콜
- 파일 시스템 접근 관련 오버헤드
        - 파일 하나를 접근하기 위해서 다수의 API콜이 필요로 하다.
        - openCostInBytes라는 오버헤드가 각 파일마다 부여됨
- 읽어들이면서 파티션의 수를 줄일 수 있지만 오버헤드가 크다.
        - 파일로 쓸 때 어느정도 정리를 해주는 것이 필요하다.

데이터를 저장할 때 파티션 수와 크기(기본)
- bucketBy나 partitionBy를 사용하지 않는 경우
        - 각 파티션이 하나의 파일로 쓰여짐
        - saveAsTable vs .save
- 적당한 크기와 수의 파티션을 찾는 것이 중요
        - 작은 크기의 다수의 파티션이 있다면 문제
        - 큰 크기의 소수의 파티션도 문제(splittable하지 않은 포맷으로 저장이 될 경우)
- Repartition 혹은 coalesce를 적절히 사용
        - 이 경우 AQE의 Coalescing가 도움이 될 수 있다(repartition)
- PARQUET포맷 사용
        - Snappy compression 사용

데이터를 저장할 때 파티션 수와 크기(bucketBy)
- 데이터의 특성을 잘 아는 경우 특정 ID를 기준으로 나눠서 테이블로 저장
        - 다음부터는 이를 로딩하여 사용함으로써 반복 처리시 시간 단축
        - Bucket의 수와 기준 ID지정
- bucket의 수와 키를 지정해야한다.
        - df.write.mode("overwrite").bucketBy(3, key).saveAsTable(table)
        - sortBy를 사용하여 순서를 정하기도 한다.
        - 이 정보는 metastore에 같이 저장이 된다.

데이터를 저장할 때 파티션 수와 크기(partitonBy)
- 굉장히 큰 로그 파일을 데이터 생성시간 기반으로 데이터 읽기를 많이 할 때
        - 데이터 자체를 연도-월-일의 폴더 구조로 저장
                - 데이터 읽기과정을 최적화, 데이터관리 용이(Retention Policy적용시)
        - Partition key를 잘못 선택하면 엄청나게 많은 파일들이 생성된다.
                - Cardinality가 높은 컬럼을 키로 지정하면 안된다 !
- Partitioning할 키를 지정해야한다.
        - df.write.mode("overwrite").partitionBy("order_month").saveAsTable("order")
        - df.write.mode("overwrite").partitionBy("year", "month", "day").saveAsTable("appl_stock")

데이터를 저장할 때 파티션의 수와 크기 : bucketBy & partitionBy
- partitonBy 후에 bucketBy사용
- 필터링 패턴 기준 partitionBy 후 그룹핑/조인 패턴 기준 bucketBy 사용
- df.write.mode("overwrite").partitionBy("dept").bucketBy(5, "employeeId")

Spark Persistent 테이블의 통계 정보 확인
- spark.sql("DESCRIBE EXTENDED 테이블이름")
        - bucket/partition 테이블정보를 얻을 수 있다.
ex) spark.sql("DESCRIBE EXTENDED appl_stock").show()