일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Tableau
- Snowflake
- Til
- SQL
- 슈퍼셋
- HTML
- 코테 연습
- Selenium
- Kafka
- AWS
- 팀 프로젝트
- airflow
- 코딩테스트
- 데이터 엔지니어
- django
- beuatifulsoup
- cloud platform
- superset
- 코딩 테스트
- PCCP
- 데이터 시각화
- 데브코스
- Spark
- Today
- Total
주니어 데이터 엔지니어 우솨's 개발일지
데이터 엔지니어링 73일차 TIL 본문
학습 내용
Dynamically optimizing skew joins
: Skew된 조인을 효율적으로 처리하는 방법
Skew 파티션으로 인한 성능 문제를 해결하기 위함
- 한 두개의 오래 걸리는 태스크들로 인한 전체 Job/Stage 종료 지연
- disk spill이 발생한다면 더 느려지게 된다.
- AQE에서의 해법
- skew 파티션의 존재 여부 파악
- skew 파티션을 작게 나누고 상대 조인 파티션을 중복하여 만들고 조인 수행
Dynamically optimizing skew joins 동작방식
Dynamically optimizing skew joins 환경변수(Spark 3.3.1)
파티션 관련 환경변수(Spark 3.3.1)
- spark.sql.shuffle.partitions
- 클러스터 자원과 처리 데이터의 크기를 고려해서 job마다 바꿔서 설정
- 큰 데이터를 처리하는 거라면 클러스터 전체 코어의 수로 설정
- AQE를 사용하는 관점에서는 조금 더 크게 잡는 것이 좋다.
Salting이란?
: Skew Partition을 처리하기 위한 테크닉
- AQE의 등장으로 많이 쓰이지는 않음
- AQE만으로 skew partition 이슈가 사라지지 않는다면 사용
- 랜덤 필드를 만들고 그 기준으로 파티션을 새로 만들어서 처리
- Aggregation 처리의 경우 효과적
- Join의 경우 효과적이지 않다.
- AQE에 의존하는 것이 좋다
- Join시 사용 Salting 테크닉을 일반화한 것이 AQE의 skew JOIN처리방식이다.
Spill이란?
: 파티션의 크기가 너무 커서 메모리가 부족한 경우 그 여분을 디스크에 쓰는 것
- Spill이 발생하면 실행시간이 늘어나고 OOM이 발생할 가능성이 올라간다.
- Spill이 발생하는 경우(메모리가 부족해지는 경우)
- Skew Partition 대상 Aggregation
- Skew Partition 대상 Join
- 굉장히 큰 explode 작업
- 큰 파티션이라면 위의 작업시 spill가능성이 더 높아진다.
Spill의 종류
- Memory
- 디스크로 spill된 데이터가 메모리에 있으르 때의 크기
- Deserialized 형태라 크기가 보통 8~10배 정도 더 크다.
- Disk
- 메모리에서 spill된 데이터가 디스크에서 차지하는 크기
- Serialized 형태라 보통 크기가 훨씬 더 작다.
Partition의 종류 (Life cycle of Partitons)
: 파티션의 크기는 128MB ~ 1GB가 좋다.
a. 입력 데이터를 로드할 때 파티션 수와 크기
- maxSplitBytes 결정공식
- bytesPerCore = (데이터 파일들 전체크기 + 파일수 * OpenCostInBytes) / default.parallelism
- maxSplitBytes = Min(maxPartitonBytes, bytesPerCore)
- 파티션 생성 방법
- Splittable한 파일은 maxSplitBytes 단위로 분할하여 File Chunk 생성.
- Splittable하지 않거나 작은 파일은 하나의 File Chunk로 생성.
- 여러 File Chunk를 하나의 파티션으로 병합, maxSplitBytes를 넘지 않도록 설정.
b. 셔플링 후 만들어지는 파티션 수와 크기
- spark.sql.shuffle.partitions
- 기본값: 200
- 클러스터 자원과 데이터 크기를 고려하여 설정.
- 큰 데이터를 처리할 경우 더 큰 값으로 설정, AQE를 사용하는 경우 더 크게 설정.
c. 데이터를 최종적으로 저장할 때 파티션 수와 크기
- bucketBy 사용:
- 특정 ID 기준으로 데이터를 나눠 저장.
- ex) df.write.mode("overwrite").bucketBy(3, "key").saveAsTable("table")
- partitionBy 사용:
- 데이터 생성 시간 등 특정 기준으로 폴더 구조로 저장.
- ex) df.write.mode("overwrite").partitionBy("order_month").saveAsTable("order")
- bucketBy와 partitionBy 혼용:
- 필터링 기준으로 partitionBy 후, 그룹핑/조인 기준으로 bucketBy 사용.
- ex) df.write.mode("overwrite").partitionBy("dept").\
bucketBy(5, "employeeId").saveAsTable("table")
입력 데이터를 로드할 때 파티션 수와 크기
- 기본적으로 파티션의 최대 크기에 의해 결정된다.
- spark.sql.files.maxpartitonbytes (128MB)
- 해당 데이터가 어떻게 저장되었는지와 연관이 많이 된다.
- 파일포맷이 무엇이고 압축되었는지? 압축되었다면 무슨 알고리즘으로?
- 결국 Splittable 한가? =>하나의 큰 파일을 다수의 파티션으로 나눠 로드할 수 있는가?
- 기타 관련 Spark 환경변수들
입력 데이터 파티션 수와 크기를 결정해주는 변수들
- bucketBy로 저장된 데이터를 읽는 경우
- Bucket의 수와 Bucket 기준 컬럼들과 정렬 기준 컬럼들
- 읽어들이는 데이터 파일이 splittable한지?
- PARQUET/AVRO등이 좋은 이유 : 항상 splittable하다
- JSON/CSV 등의 경우 한 레코드가 multi-line이라면 splittable하지 않음.
- Single line이라도 압축시 bzip2를 사용해야만 splittable하다.
- 입력 데이터의 전체 크기(모든 파일크기의 합)
- 리소스 매니저에게 요청한 CPU의 수
입력 데이터 파티션 수와 크기 결정 방식
- maxSplitBytes 결정공식
- bytesPerCore = (데이터 파일들 전체크기 + 파일수 * OpenCostInBytes) / default.parallelism
- maxSplitBytes = Min(maxPartitonBytes, bytesPerCore)
- 입력 데이터를 구성하는 각 파일에 대해 다음을 진행
- Splittable하다면 maxSplitBytes 단위로 분할하여 File Chunk 생성
- Splittable하지 않거나 크기가 maxSplitBytes보다 작다면 하나의 File chunk생성
- 위에서 만들어진 File Chunk들로부터 파티션 생성
- 기본적으로 하나의 파티션은 하나 혹은 그 이상의 File Chunk들로 구성
- 한 파티션에 다음 File Chunk의 크기 + openCostInBytes를 더했을 때의 값이 maxSplitBytes를 넘어가지 않을 때까지 계속해서 머지함(파일들이 하나의 파티션으로 패킹된다.)
=> 코어별로 처리해야할 크기가 너무 크다면 maxPartitonBytes로 제약
=> 데이터의 크기가 적당하면 코어수 만큼의 파티션을 생성
=> 데이터의 크기가 너무 작으면 openCostInBytes로 설정한 크기로 파티션 생성
Bucketing(bucketBy)
: 데이터를 자주 사용되는 컬럼 기주능로 미리 저장해두고 활용
- 다양한 최적화 기능
- 조인 대상 테이블들이 조인 키를 갖고 bucketing된 경우 shuffle free join가능
- 한쪽만 bucketing되어있는 경우 one-side shuffle free join 가능(bucket의 크기에 달려있다.)
- Bucket pruinng을 통한 최적화 가능
- Shuffle free aggregation
- Bucket 정보가 metastore에 저장되고 Spark Compiler는 이를 활용
- sortBy를 통해 순서를 미리 정해주기도 한다.
- Spark 테이블로 저장하고 로딩해야지만 이 정보를 이용 가능함.
- saveAsTable로 저장, spark.table()로 로딩
Bucketing(bucketBy) 저장방식
- Bucket의 수 * Partition의 수 만큼의 파일이 만들어진다.
- ex) DataFrame의 Partiton 수가 10이고 Bucket의 수가 4라면 40개의 파일 생성
- 다시 읽어들일 때 10개의 Partition으로 읽혀짐
- 다시 읽어들일 때 원래의 Partiton의 수만큼으로 재구성된다.
- Bucketing 키를 기반으로 작업시 셔플이 없어짐
Small Files 신드롬이란?
: 작은 크기의 많은 파일이 문제가 되는 것
- 64MB의 파일 하나를 읽는 것 vs 64Byte의 파일 백만개를 읽는 것
- API콜은 모두 네트워크 RPC 콜
- 파일 시스템 접근 관련 오버헤드
- 파일 하나를 접근하기 위해서 다수의 API콜이 필요로 하다.
- openCostInBytes라는 오버헤드가 각 파일마다 부여됨
- 읽어들이면서 파티션의 수를 줄일 수 있지만 오버헤드가 크다.
- 파일로 쓸 때 어느정도 정리를 해주는 것이 필요하다.
데이터를 저장할 때 파티션 수와 크기(기본)
- bucketBy나 partitionBy를 사용하지 않는 경우
- 각 파티션이 하나의 파일로 쓰여짐
- saveAsTable vs .save
- 적당한 크기와 수의 파티션을 찾는 것이 중요
- 작은 크기의 다수의 파티션이 있다면 문제
- 큰 크기의 소수의 파티션도 문제(splittable하지 않은 포맷으로 저장이 될 경우)
- Repartition 혹은 coalesce를 적절히 사용
- 이 경우 AQE의 Coalescing가 도움이 될 수 있다(repartition)
- PARQUET포맷 사용
- Snappy compression 사용
데이터를 저장할 때 파티션 수와 크기(bucketBy)
- 데이터의 특성을 잘 아는 경우 특정 ID를 기준으로 나눠서 테이블로 저장
- 다음부터는 이를 로딩하여 사용함으로써 반복 처리시 시간 단축
- Bucket의 수와 기준 ID지정
- bucket의 수와 키를 지정해야한다.
- df.write.mode("overwrite").bucketBy(3, key).saveAsTable(table)
- sortBy를 사용하여 순서를 정하기도 한다.
- 이 정보는 metastore에 같이 저장이 된다.
데이터를 저장할 때 파티션 수와 크기(partitonBy)
- 굉장히 큰 로그 파일을 데이터 생성시간 기반으로 데이터 읽기를 많이 할 때
- 데이터 자체를 연도-월-일의 폴더 구조로 저장
- 데이터 읽기과정을 최적화, 데이터관리 용이(Retention Policy적용시)
- Partition key를 잘못 선택하면 엄청나게 많은 파일들이 생성된다.
- Cardinality가 높은 컬럼을 키로 지정하면 안된다 !
- Partitioning할 키를 지정해야한다.
- df.write.mode("overwrite").partitionBy("order_month").saveAsTable("order")
- df.write.mode("overwrite").partitionBy("year", "month", "day").saveAsTable("appl_stock")
데이터를 저장할 때 파티션의 수와 크기 : bucketBy & partitionBy
- partitonBy 후에 bucketBy사용
- 필터링 패턴 기준 partitionBy 후 그룹핑/조인 패턴 기준 bucketBy 사용
- df.write.mode("overwrite").partitionBy("dept").bucketBy(5, "employeeId")
Spark Persistent 테이블의 통계 정보 확인
- spark.sql("DESCRIBE EXTENDED 테이블이름")
- bucket/partition 테이블정보를 얻을 수 있다.
ex) spark.sql("DESCRIBE EXTENDED appl_stock").show()
'데브코스' 카테고리의 다른 글
데이터 엔지니어링 75일차 TIL (0) | 2024.07.13 |
---|---|
데이터 엔지니어링 74일차 TIL (0) | 2024.07.11 |
데이터 엔지니어링 72일차 TIL (0) | 2024.07.09 |
데이터 엔지니어링 71일차 TIL (0) | 2024.07.08 |
데이터 엔지니어링 70일차 TIL (0) | 2024.07.06 |