주니어 데이터 엔지니어 우솨's 개발일지

데이터 엔지니어링 71일차 TIL 본문

데브코스

데이터 엔지니어링 71일차 TIL

우솨 2024. 7. 8. 21:23

학습내용

Broadcast Variable이란?
: 록업 테이블등을 브로드캐스팅하여 셔플링을 막는 방식으로 사용
- 브로드캐스트 조인에서 사용되는 것과 동일한 테크닉
- 대부분 룩업 테이블 (혹은 디멘션 테이블 - 10~20MB)을 Executor로 전송하는데 사용
        - 많은 DB에서 스타 스키마 형태로 팩트 테이블과 디멘션 테이블을 분리
- spark.sparkContext.broadcast를 사용

Broadcast Variable : 룩업 테이블(파일)을 UDF로 보내는 방법
- Closure
        - Serialization이 태스크 단위로 일어남
        - UDF안에서 파이썬 데이터 구조를 사용하는 경우
- Broadcast
        - Serialization이 Worker Node 단위로 일어남
        - UDF안에서 브로드캐스트된 데이터 구조를 사용하는 경우
- Broadcast 데이터셋의 특징
        - Worker node로 공유되는 변경 불가 데이터
        - Worker node별로 한번 공유되고 캐싱됨
        - 제약점은 Task Memory안에 들어갈 수 있어야함

Accumulators란?
- 특정 이벤트의 수를 기록하는데 사용 -> 일종의 전역 변수
        - 하둡의 카운터와 흡사
- ex) 비정상적인 값을 갖는 레코드의 수를 세는데 사용

Accumulators의 특징
- 변경 가능한 전역변수로 드라이버에 위치
- 스칼라로 만들면 이름을 줄 수 있지만 그 이외에는 불가
        - 이름있는 accumulator만 Spark Web UI에서 나타남.
- 레코드 별로 세거나 합을 구하는데 사용 가능
- 두 가지 방법으로 사용 가능하며 값의 정확도도 달라진다.
        1.Transformation에서 사용
          - 값이 부정확 할 수 있다.(태스크의 재실행과 speculative execution)
        2. DataFrame/RDD Foreach에서 사용
          - 추천되는 방식으로 값이 매우 정확하다.

Speculative Execution
: 느린 태스크를 다른 Worker node에 있는 Executor에서 중복 실행
        - 이를 통해 Worker node의 하드웨어 이슈등으로 느려지는 경우 빠른 실행을 보장
        - Data Skew로 인해 오래 걸린다면 도움이 되지 않고 리소스만 낭비하게 됨

Speculative Execution제어방식
- spark.speculation으로 컨트롤 가능하며 기본은 False(비활성화)
        - 하둡 Mapreduce에서부터 있던 기능
- 다양한 환경변수로 세밀하게 제어가 가능하다



Spark의 리소스 할당(스케줄링)
- Spark Application들간의 리소스 할당
        - 기반이 되는 리소스 매니저가 결정(YARN은 세가지 방식 지원 : FIFO, FAIR, CAPACITY)
        - 한번 리소스를 할당받으면 해당 리소스를 끝까지 들고 가는 것이 기본
- 하나의 Spark Application안에서 잡들간의 리소스 할당
        - FIFO 형태로 처음 잡이 필요한대로 리소스를 받아서 쓰는 것이 기본

Spark Application의 리소스 요구/릴리스 방식
- Static Allocation(기본동작)
        - Spark Application은 리소스 매니저로부터 (YARN) 받은 리소스를 보통 끝까지 들고간다
        - 이는 리소스 사용률에 악영향을 줄 가능성이 높다.
- Dynamic Allocation
        - Spark Application이 상황에 따라 executor를 릴리스하기도 하고 요구하기도 한다.
        - 다수의 Spark Application들이 하나의 리소스 매니저를 공유한다면 활성화하는 것이 좋다.

Dynamic Resource Allocation의 환경변수들
spark.dynamicAllocation.enabled = true
spark.dynamicAllocation.shuffleTracking.enabled = true
spark.dynamicAllocation.executorldleTimeout = 60s (릴리스 타이밍 결정)
spark.dynamicAllocation.schedulerBacklogTimeout = 1s (요청 타이밍 결정)
spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.maxExecutors
spark.dynamicAllocation.initialExecutors
spark.dynamicAllocation.executorAllocationRatio

Spark Scheduler란?
: 하나의 Spark Application내의 Job들에 리소스를 나눠주는 정책
        - Spark Application들간에 리소스를 나눠주는 방식은 리소스 매니저에게 달려있다.
- FIFO(First In First Out -기본)
        - 리소스를 처음 요청한 Job에게 리소스 우선순위가 간다.
- FAIR
        - 라운드로빈 방식으로 모든 잡에게 고르게 리소스를 분배하는 방식
        - 이 안에서 풀(POOL)이라 형태로 리소스를 나눠서 우선순위를 고려한 형태로 사용이 가능하다.
                - 풀 안에서 리소스 분배도 FAIR 혹은 FIFO로 지정 가능하다.
 
Scheduler를 활용한 병렬성 증대
- 병렬성 증대 -> Thread 활용이 필요
        - FAIR 모드의 스케줄러일 경우 더 효과적이다.
- 관련 환경 변수
        - spark.scheduler.mode : FIFO 혹은 FAIR
        - spark.scheduler.allocation.file : FAIR인 경우 필요하며 풀을 정의해놓는 형태로 사용

Spark Driver의 역할
: Spark Application = (1 Driver) + (1 + Executor)
- main함수를 실행하고 SparkSession/SparkContet를 생성
- 코드를 태스크로 변환하여 DAG 생성
- 이를 execution/logical/physical plan으로 변환한다.
- 리소스 매니저의 도움을 받아 태스크들을 실행하고 관리
        - task의 수가 너무 많아지면 driver 메모리 에러 발생
- 위의 정보들을 Web UI로 노출시킴(4040 포트)

Driver의 메모리 구성


Executor 메모리 구성


Heap 메모리 구성


Spark 메모리 구성


Executor CPU 구성


스파크의 메모리 에러
- Driver OOM 에러가 발생하는 이유
        - 큰 데이터셋에 collect를 실행
        - 큰 데이터셋을 Broadcast join
        - Python이나 R 등으로 작성된 코드
        - 너무 많은 태스크들

- Executor OOM 에러가 발생하는 이유
        - 너무 큰 executor.cores 값(High Concurrency)
        - Data Skew(Big Partition)

PySpark Driver
- Python 프로세스 + JVM 프로세스



Spark와 Python간의 통신
- Py4J : 파이썬과 JVM간의 데이터 교환을 통해 둘간의 연동을 도와주는 프레임웍
- DataFrame/RDD 연산중에 파이썬 코드가 사용된다면?
        => 별도의 파이썬 프로세스를 통해 실행되며 파티션 데이터가 모두 넘어간다.
1. 드라이버 위의 파이썬 프로세스에서 실행할 코드와 기타 데이터를 serialize해서 Executor에 전송
2. JVM executor는 파이썬 프로세스 실행(PySpark script)
3. Executor는 이것과 파티션을 serialize하고 위에서 받은 코드와 함께 파이썬 프로세스로 전송
4. 파이썬 프로세스에서 계산이 끝나면 결과가 다시 Executor로 serialize되어서 전송된다.

Spark Caching
- 자주 사용되는 데이터프레임을 메모리에 유지하여 처리속도 증가
        - 데이터프레임이 정말 메모리에 있는지 확인이 필요
        - 다시 계산하는 것이 빠를 경우도 있다
- 메모리 소비를 늘리기 때문에 불필요하게 모든 걸 캐싱할 필요는 없다.

Spark에서의 DataFrame Caching
- 두가지 방법 존재
        - persist()
        - cache()
- 둘 모두 데이터프레임을 메모리/디스크/오프힙에 보존한다.
        - 둘 모두 lazy execution - 필요해지기 전까지 캐싱하지 않음
        - caching은 항상 파티션 단위로 메모리에 보존(하나의 파티션이 부분적으로 caching되지 않는다.)

Spark persist/cache
: 인자를 통해 세부 제어 가능
- useDisk = True
- useMemory = True
- useOffHeap = False  -- off Heap 설정이 필요
- deserialized = False
        - 메모리를 줄일지 아니면 CPU 계산을 줄일지 => CPU계산을 줄이는게 효율적
        - deserialized = True 는 메모리에서만 가능하다.
- replication = 1
        - 몇 개의 복사본을 서로 다른 executor에 저장할지 결정
=> persist는 기본적으로 caching되는 데이터프레임을 메모리와 디스크에 보관하고 복제도 수행한다.

- cache는 persist의 다음 조건과 같다
        - disk = False
        - memory = True
        - offHeap = False
        - deserialized = True
        - replication = 1