주니어 데이터 엔지니어 우솨's 개발일지

데이터 엔지니어링 45일차 TIL 본문

데브코스

데이터 엔지니어링 45일차 TIL

우솨 2024. 6. 5. 19:11

학습내용

ETL방법
- 카피하려는 테이블의 레코드 수가 적다면 INSERT INTO 사용
- 레코드의 수가 많다면 s3에서 redshift로 벌크 업데이트(COPY커맨드 사용)

AWS S3 접근(Connections 설정)
- Access Key ID와 Secret Access Key를 사용하는 걸로 바뀜

    - 루트 사용자의 키들을 사용하면 해킹시 AWS 자원들을 마음대로 사용 가능 -> 여러번 사고가

    - 우리가 사용해볼 Best Practice는:
    - IAM(Identity and Access Management)을 사용해 별도의 사용자를 만들고
    - 그 사용자에게 해당 S3 bucket을 읽고 쓸 수 있는 권한을 제공하고
    - 그 사용자의 Access Key ID와 Secret Access Key를 사용
    - 이 키도 주기적으로 변경해서 해킹이 될 경우의 피해를 최소화한다

MySQL_to_Redshift DAG의 Task 구성
- SqlToS3Operator
   -MySQL SQL 결과 -> S3
   - (s3://grepp-data-engineering/{본인ID}-nps)
   - s3://s3_bucket/s3_key
- S3ToRedshiftOperator
   - S3 -> Redshift 테이블
   - (s3://grepp-data-engineering/{본인ID}-nps) -> Redshift (본인스키마.nps)
   - COPY command is used
   - replace = True => 덮어쓰기 허용

MySQL 테이블의 Incremental Update 방식
- MySQL/PostgreSQL 테이블이라면 다음을 만족해야함
   - created (timestamp): Optional
   - modified (timestamp)
   - deleted (boolean): 레코드를 삭제하지 않고 deleted를 True로 설정


- Daily Update이고 테이블의 이름이 A이고 MySQL에서 읽어온다면
- ROW_NUMBER로 직접 구현하는 경우
   - 먼저 Redshift의 A 테이블의 내용을 temp_A로 복사
   - MySQL의 A 테이블의 레코드 중 modified의 날짜가 지난 일(execution_date)에 해당하는 모든 레코드를 읽어다가 temp_A로 복사
- 아래는 MySQL에 보내는 쿼리. 결과를 파일로 저장한 후 S3로 업로드하고 COPY 수행
- SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
   - temp_A의 레코드들을 primary key를 기준으로 파티션한 다음에 modified 값을 기준으로 DESC 정렬해서, 일련번호가 1인 것들만 다시 A로 복사


- Daily Update이고 테이블의 이름이 A이고 MySQL에서 읽어온다면
- S3ToRedshiftOperator로 구현하는 경우
   - query 파라미터로 아래를 지정
- SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
   - method 파라미터로 “UPSERT”를 지정
   - upsert_keys 파라미터로 Primary key를 지정
- 앞서 nps 테이블이라면 “id” 필드를 사용

Backfill
- DAG 파라미터 max_active_runs를 이용해서 여러 날짜에대해 실행해도 한번에 하나만 연달아 실행되도록 할 수 있다.
CLI에서 Backfill실행
airflow dags backfill dag_id -s 2018-07-1 -e 2018-08-01
- catchup이 True로 설정되어 있어야한다.
- execution_date를 이용해서 Incremental update가 구현되어 있어야 한다.
- start_date부터 시작하지만 end_date는 포함하지 않는다.
- 실행순서는 날짜/시간순이 아니라 랜덤이다.
   - 만일 날짜 순으로 하고 싶다면 DAG default_args의 depends_on_past를 True로 설정해야 한다.

DAG Backfill 준비
- 먼저 모든 DAG가 backfill을 필요로 하지는 않음
   - Full Refresh를 한다면 backfill은 의미가 없음
- 여기서 backfill은 일별 혹은 시간별로 업데이트하는 경우를 의미함
   - 마지막 업데이트 시간 기준 backfill을 하는 경우라면 (Data Warehouse 테이블에 기록된 시간 기준) 이런 경우에도 execution_date을 이용한 backfill은 필요하지 않음
- 데이터의 크기가 굉장히 커지면 backfill 기능을 구현해 두는 것이 필수
   - airflow가 큰 도움이 됨
   - 하지만 데이터 소스의 도움 없이는 불가능

- 어떻게 backfill로 구현할 것인가
   - 제일 중요한 것은 데이터 소스가 backfill 방식을 지원해야함
   - “execution_date”을 사용해서 업데이트할 데이터 결정
   - “catchup” 필드를 True로 설정
   - start_date/end_date을 backfill하려는 날짜로 설정
   - 다음으로 중요한 것은 DAG 구현이 execution_date을 고려해야 하는 것이고 idempotent 해야함

Airflow란 무엇인가?
- Airflow는 파이썬으로 작성된 데이터 파이프라인 (ETL) 프레임웍
   - 가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임웍
   - Airflow에서 데이터 파이프라인을 DAG(Directed Acyclic Graph)라고 부름
- Airflow의 장점
   - 데이터 파이프라인을 세밀하게 제어 가능
   - 다양한 데이터 소스와 데이터 웨어하우스를 지원
   - 백필(Backfill)이 쉬움
- Airflow 관련 중요 용어/개념
   - start_date, execution_date, catchup
- 스케일링 방식
   - Scale Up vs. Scale Out vs. 클라우드 버전 vs. K8s 사용

데이터 파이프라인 작성시 기억할 점
- 데이터 파이프라인에 관한 정보를 수집하는 것이 중요
     - 비지니스 오너와 데이터 리니지에 주의할 것
     - 결국 데이터 카탈로그가 필요
- 데이터 품질 체크
     - 입력 데이터와 출력 데이터
- 코드 실패를 어설프게 복구하려는 것보다는 깔끔하게 실패하는 것이 좋음
- 가능하면 Full Refresh
     - Incremental Update를 쓸 수 밖에 없다면 Backfill 방식을 먼저 생각해둘 것 -> Airflow가 필요한
이유
- 주기적인 청소 (데이터, 테이블, Dag)