일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Til
- 데이터 시각화
- HTML
- Kafka
- 데브코스
- AWS
- 팀 프로젝트
- PCCP
- Snowflake
- cloud platform
- VPC
- Selenium
- 슈퍼셋
- Tableau
- 데이터 엔지니어
- 코테 연습
- 코딩테스트
- SQL
- django
- airflow
- Spark
- GCP
- 코딩 테스트
- superset
- beuatifulsoup
- Today
- Total
주니어 데이터 엔지니어 우솨's 개발일지
데이터 엔지니어링 45일차 TIL 본문
학습내용
ETL방법
- 카피하려는 테이블의 레코드 수가 적다면 INSERT INTO 사용
- 레코드의 수가 많다면 s3에서 redshift로 벌크 업데이트(COPY커맨드 사용)
AWS S3 접근(Connections 설정)
- Access Key ID와 Secret Access Key를 사용하는 걸로 바뀜
- 루트 사용자의 키들을 사용하면 해킹시 AWS 자원들을 마음대로 사용 가능 -> 여러번 사고가
남
- 우리가 사용해볼 Best Practice는:
- IAM(Identity and Access Management)을 사용해 별도의 사용자를 만들고
- 그 사용자에게 해당 S3 bucket을 읽고 쓸 수 있는 권한을 제공하고
- 그 사용자의 Access Key ID와 Secret Access Key를 사용
- 이 키도 주기적으로 변경해서 해킹이 될 경우의 피해를 최소화한다
MySQL_to_Redshift DAG의 Task 구성
- SqlToS3Operator
-MySQL SQL 결과 -> S3
- (s3://grepp-data-engineering/{본인ID}-nps)
- s3://s3_bucket/s3_key
- S3ToRedshiftOperator
- S3 -> Redshift 테이블
- (s3://grepp-data-engineering/{본인ID}-nps) -> Redshift (본인스키마.nps)
- COPY command is used
- replace = True => 덮어쓰기 허용
MySQL 테이블의 Incremental Update 방식
- MySQL/PostgreSQL 테이블이라면 다음을 만족해야함
- created (timestamp): Optional
- modified (timestamp)
- deleted (boolean): 레코드를 삭제하지 않고 deleted를 True로 설정
- Daily Update이고 테이블의 이름이 A이고 MySQL에서 읽어온다면
- ROW_NUMBER로 직접 구현하는 경우
- 먼저 Redshift의 A 테이블의 내용을 temp_A로 복사
- MySQL의 A 테이블의 레코드 중 modified의 날짜가 지난 일(execution_date)에 해당하는 모든 레코드를 읽어다가 temp_A로 복사
- 아래는 MySQL에 보내는 쿼리. 결과를 파일로 저장한 후 S3로 업로드하고 COPY 수행
- SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
- temp_A의 레코드들을 primary key를 기준으로 파티션한 다음에 modified 값을 기준으로 DESC 정렬해서, 일련번호가 1인 것들만 다시 A로 복사
- Daily Update이고 테이블의 이름이 A이고 MySQL에서 읽어온다면
- S3ToRedshiftOperator로 구현하는 경우
- query 파라미터로 아래를 지정
- SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
- method 파라미터로 “UPSERT”를 지정
- upsert_keys 파라미터로 Primary key를 지정
- 앞서 nps 테이블이라면 “id” 필드를 사용
Backfill
- DAG 파라미터 max_active_runs를 이용해서 여러 날짜에대해 실행해도 한번에 하나만 연달아 실행되도록 할 수 있다.
CLI에서 Backfill실행
airflow dags backfill dag_id -s 2018-07-1 -e 2018-08-01
- catchup이 True로 설정되어 있어야한다.
- execution_date를 이용해서 Incremental update가 구현되어 있어야 한다.
- start_date부터 시작하지만 end_date는 포함하지 않는다.
- 실행순서는 날짜/시간순이 아니라 랜덤이다.
- 만일 날짜 순으로 하고 싶다면 DAG default_args의 depends_on_past를 True로 설정해야 한다.
DAG Backfill 준비
- 먼저 모든 DAG가 backfill을 필요로 하지는 않음
- Full Refresh를 한다면 backfill은 의미가 없음
- 여기서 backfill은 일별 혹은 시간별로 업데이트하는 경우를 의미함
- 마지막 업데이트 시간 기준 backfill을 하는 경우라면 (Data Warehouse 테이블에 기록된 시간 기준) 이런 경우에도 execution_date을 이용한 backfill은 필요하지 않음
- 데이터의 크기가 굉장히 커지면 backfill 기능을 구현해 두는 것이 필수
- airflow가 큰 도움이 됨
- 하지만 데이터 소스의 도움 없이는 불가능
- 어떻게 backfill로 구현할 것인가
- 제일 중요한 것은 데이터 소스가 backfill 방식을 지원해야함
- “execution_date”을 사용해서 업데이트할 데이터 결정
- “catchup” 필드를 True로 설정
- start_date/end_date을 backfill하려는 날짜로 설정
- 다음으로 중요한 것은 DAG 구현이 execution_date을 고려해야 하는 것이고 idempotent 해야함
Airflow란 무엇인가?
- Airflow는 파이썬으로 작성된 데이터 파이프라인 (ETL) 프레임웍
- 가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임웍
- Airflow에서 데이터 파이프라인을 DAG(Directed Acyclic Graph)라고 부름
- Airflow의 장점
- 데이터 파이프라인을 세밀하게 제어 가능
- 다양한 데이터 소스와 데이터 웨어하우스를 지원
- 백필(Backfill)이 쉬움
- Airflow 관련 중요 용어/개념
- start_date, execution_date, catchup
- 스케일링 방식
- Scale Up vs. Scale Out vs. 클라우드 버전 vs. K8s 사용
데이터 파이프라인 작성시 기억할 점
- 데이터 파이프라인에 관한 정보를 수집하는 것이 중요
- 비지니스 오너와 데이터 리니지에 주의할 것
- 결국 데이터 카탈로그가 필요
- 데이터 품질 체크
- 입력 데이터와 출력 데이터
- 코드 실패를 어설프게 복구하려는 것보다는 깔끔하게 실패하는 것이 좋음
- 가능하면 Full Refresh
- Incremental Update를 쓸 수 밖에 없다면 Backfill 방식을 먼저 생각해둘 것 -> Airflow가 필요한
이유
- 주기적인 청소 (데이터, 테이블, Dag)
'데브코스' 카테고리의 다른 글
데이터 엔지니어링 47일차 TIL (0) | 2024.06.05 |
---|---|
데이터 엔지니어링 46일차 TIL (1) | 2024.06.05 |
데이터 엔지니어링 44일차 TIL (1) | 2024.06.05 |
데이터 엔지니어링 43일차 TIL (0) | 2024.06.05 |
데이터 엔지니어링 42일차 TIL (0) | 2024.06.05 |