주니어 데이터 엔지니어 우솨's 개발일지

데이터 엔지니어링 53일차 TIL 본문

데브코스

데이터 엔지니어링 53일차 TIL

우솨 2024. 6. 5. 19:41

학습내용

Dag란?
- DAG(Directed Acyclic Graph, 방향성 비순환 그래프)는 방향성이 있는 간선을 가지며 순환이 없는 그래프 구조이다.
- 주로 작업 스케줄링, 데이터 처리, 블록체인 등에서 의존성을 표현하고 관리하기 위해 사용한다.
-노드와 간선으로 이루어져 있으며, 각 간선은 특정 방향으로만 이동 가능하다

Dag를 실행하는 방법
- 주기적 실행 : schedule로 지정
- 다른 Dag에 의해 트리거
    - Explicit Trigger : Dag A가 분명하게 Dag B를 트리거(TriggerDagOperator)
    - Reactive Trigger : Dag B가 Dag A가 끝나기를 대기(ExternalTaskSensor)
        - Dag A는 Dag B가 기다리는 것을 모름
-알아두면 좋은 상황에 따라 다른 태스크 실행 방식들
    - BranchPythonOperator : 조건에 따라 다른 태스트로 분기
    - LatestOnlyOperator : 과거 데이터 Backfill시에는 불필요한 테스크 처리
    - 앞단 태스크들의 실행상황 : 어떤 경우에는 앞단이 실패해도 동작해야하는 경우가 있을 수 있다.

JinJa Template란?
: Python에서 널리 사용되는 템플릿 엔진으로서 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML 생성(Flask에서 사용됨)
- 변수는 이중 중괄호 {{}}로 감싸서 사용(앞뒤에 공백을 넣어줘야한다!!)
    ex) <h1> 안녕하세요, {{ name }}님!</h1>
- 제어문은 퍼센트 기호 {% %}로 표시
    ex) <ul>
         {% for item in items %}
          <li> {{item}} </li>
         {% endfor %}
         </ul>

Ariflow에서의 Jinja template활용
- Airflow에서 Jinja템플릿을 사용하면 작업 이름, 파라미터, SQL 쿼리와 같은 작업 매개변수를 템플릿화된 문자열로 정의 가능하다
    - 이를 통해 재사용간으하고 사용자 정의 가능한 워크플로우를 생성한다
- execution_date을 코드 내에서 쉽게 사용 : {{ ds }}
- 파라미터 등으로 넘어온 변수를 쉽게 사용 가능하다
JinJa변수
- {{ ds }}(날짜), {{ params.name }}(변수), {{ ds_nodash }}, {{ ts }}, {{ dag }}, {{ task }}, {{ dag_run }}
- {{ var.value }} : {{ var.value_get('my.var','fallback') }}
- {{ var.json }} : {{ var.json.my_dict_var.key1 }}
- {{ conn }} : {{ conn.my_conn_id.login }}, {{ conn.my_conn_id.password }}

Sensor란 무엇인가?
- sensor는 특정조건이 충족될 때까지 대기하는 Operator
- sensor는 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상홍 동기화에 유용하다
- Airflow 내장 Sensor
    - FileSensor : 지정된 위치에 파일이 생길 때까지 대기
    - HttpSensor : HTTP 요청을 수행하고 지정된 응답이 대기
    - SqlSensor : SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
    - TimeSensor : 특정 시간에 도달할 때까지 워크플로우를 일시중지
    - ExternalTaskSensor : 다른 Airflow DAG의 특정 작업 완료를 대기
- 기본적으로 주기적으로 poke를 하는것
    - worker를 하나 붙잡고 poke간에 sleep을 할지 아니면 worker를 릴리스하고 다시 잡아서 poke를 할지 결정해주는 파라미터가 존재 : mode
        - mode의 값은 reschedule 혹은 poke된다

ExternalTaskSensor
- DAG B의 ExternalTaskSensor태스크가 DAG A의 특정 태스크가 끝났는지 체크한다
    - 먼저 동일한 schedule_interval을 사용한다
    - 이 경우 두 태스크들의 Execution Date가 동일해야한다
    - poke모드를 쓰는 것이 일반적이며, 조건과 DAG A관점에서 알 수 없기에 실수할 확률이 높다

BrachPythonOperator
- 상황에 따라 뒤에 실행되어야할 태스크를 동적으로 결정해주는 오퍼레이터
    - 미리 정해준 Operator들 중에 선택하는 형태로 돌아감
- TriggerDagOperator 앞에 이 오퍼레이터를 사용하는 경우도 있다

LatestOnlyOperator
- Time-sensitive한 태스크들이 과거 데이터의 backfill시 실행되는 것을 막기 위함
- 현재 시간이 지금 태스크가 처리하는 execution_date보다 미래이고 다음 execution_date보다는 과거인 경우에만 뒤로 실행을 이어가고 아니면 여기서 중단한다.

Trigger Rules란? (airflow.utils.tirgger_rule.TriggerRule)
- Upstream 태스크의 성공실패 상황에 따라 뒷단 태스크의 실행여부를 결정하고 싶다면?
    - 보통 앞단이 하나라도 실패하면 뒷 단의 태스크는 실행이 불가능하다.
=> Operator에 trigger_rule이란 파라미터로 결정 가능하다.
         - trigger_rule은 태스크에 주어지는 파라미터로 다음과 같은 값이 가능하다
- all_success(기본값) : 앞단의 모두가 성공했을 경우
- all_failed : 앞단의 모두가 실패한 경우
- all_done : 성공이던 실패던 다 끝난 경우
- one_failed : 앞의 태스크가 하나라도 실패하는 경우
- one_success : 앞의 태스크가 하나라도 성공하는 경우
- none_failed : 앞의 태스크가 하나라도 실패가 없는 경우
- none_failed_min_one_success : 앞의 태스크가 실패가 없으면서 하나라도 성공하는 경우

Airflow 메타데이터 DB 내용 살펴보기
- airflow:airflow로 Postgres에 로그인 가능
1. 컨테이너 접속 : docker exec -it airflow-airflow-webserver-1 sh
2. psql shell 접속 : psql -h postgres
3. 쉘에서 입력
    - 모든 테이블 확인 : \dt
    - 실행된 dag기록 확인 : select * FROM dag_run LIMIT 10;
    - 실행된 dag기록을 제거 : DELETE FROM dag_run WHERE dag_id = '기록을삭제하고싶은DAG’;

태스크 그룹핑의 필요성
- 태스크 수가 많은 DAG라면 태스크들을 성격에 따른 관리의 필요성이 존재한다
    - SubDAG이 사용되다가 Airflow2.0에서 나온 Task Grouping으로 넘어가는 추세이다
        - SubDAG란? 비슷한 일을 하는 태스크들을 SubDAG라는 Child Dag로 만들어서 관리
- 다수의 파일 처리를 하는 DAG라면
    - 파일 다운로드 태스크들/ 파일 체크 태스크/ 데이터 처리 태스크들로 구성한다

Dynamic Dag란?
: DAG코드를 개발자가 직접 코드를 작성하는게 아니라 코드라 작성하는 것

    - 개발자가 손으로 함으로써 생산성이 떨어지는 것과 실수하는 여지를 없애고 자동화를 하는 것
- 템플릿과 YAML을 기반으로 DAG를 동적으로 만드는 것
    - Jinja를 기반으로 DAG 자체의 템플릿을 디자인하고 YAML을 통해 앞서 만든 템플릿에 파라미터를 제공한다
- 이를 통해 비슷한 DAG를 계속해서 매뉴얼하게 개발하는 것을 방지
- DAG를 계속해서 만드는 것과 한 DAG안에서 태스크를 늘리는 것 사이의 밸런스 필요
    - 오너가 다르거나 태스크의 수가 너무 커지는 경우 DAG를 복제해나가는 것이 더 좋다