주니어 데이터 엔지니어 우솨's 개발일지

데이터 엔지니어링 52일차 TIL 본문

데브코스

데이터 엔지니어링 52일차 TIL

우솨 2024. 6. 5. 19:40

학습내용

구글시트 연동
1. 구글시트 API를 활성화하고, 구글 서비스 어카운트로 생성한 후 그 내용을 JSON파일로 다운로드
2. 어카운트에서 생성해준 이메일을 조작하고 싶은 시트에 공유
3. Airflow DAG쪽에서 해당 JSON 파일로 인증하고 시트를 조작

Admin-Variable
google_sheet_access_token 설정
    - Key = 구글 서비스 어카운트의 JSON
Admin-connection
aws_conn_id  설정 (S3)
redshift_dev_db 설정(Redshift)

redshift_summary.py - 구글시트에서 S3로 복사

from airflow import DAG
from airflow.macros import *

from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python_operator import PythonOperator

import logging
from glob import glob
"""
Build a summary table under analytics schema in Redshift
- Check the input tables readiness first
- Run the SQL using a temp table
- Before swapping, check the size of the temp table
- Finally swap
"""
def load_all_jsons_into_list(path_to_json):

    configs = []
    for f_name in glob(path_to_json+ '/*.py'):
        # logging.info(f_name)
        with open(f_name) as f:
            dict_text = f.read()
            try:
                dict = eval(dict_text)
            except Exception as e:
                logging.info(str(e))
                raise
            else:
                configs.append(dict)

    return configs


def find(table_name, table_confs):
    """
    scan through table_confs and see if there is a table matching table_name
    """
    for table in table_confs:
        if table.get("table") == table_name:
            return table

    return None


def build_summary_table(dag_root_path, dag, tables_load, redshift_conn_id, start_task=None):
    logging.info(dag_root_path)
    table_confs = load_all_jsons_into_list(dag_root_path + "/config/")

    if start_task is not None:
        prev_task = start_task
    else:
        prev_task = None

    for table_name in tables_load:

        table = find(table_name, table_confs)
        summarizer = RedshiftSummaryOperator(
            table=table["table"],
            schema=table["schema"],
            redshift_conn_id=redshift_conn_id,
            input_check=table["input_check"],
            main_sql=table["main_sql"],
            output_check=table["output_check"],
            overwrite=table.get("overwrite", True),
            after_sql=table.get("after_sql"),
            pre_sql=table.get("pre_sql"),
            attributes=table.get("attributes", ""),
            dag=dag,
            task_id="anayltics"+"__"+table["table"]
        )
        if prev_task is not None:
            prev_task >> summarizer
        prev_task = summarizer
    return prev_task


def redshift_sql_function(**context):
    """this is a main Python callable function which runs a given SQL
    """

    sql=context["params"]["sql"]
    print(sql)
    hook = PostgresHook(postgres_conn_id=context["params"]["redshift_conn_id"])
    hook.run(sql, True)


class RedshiftSummaryOperator(PythonOperator):
    """
    Create a summary table in Redshift
    :param input_check: a list of input tables to check to make sure
                        they are fully populated. the list is composed
                        of sql (select) and minimum count
    :type input_check: a list of sql and count
    :param main_sql: a main sql to create a summary table. this should
                     use a temp table. this sql can have more than one 
                     statement
    :type main_sql: string
    :input output_check: output validation. It is a list of sql (select)
                         and minimum count
    :type output_check: a list of sql and count
    :input overwrite: Currently this only supports overwritting (True)
                      Once False is supported, it will append to the table
    :type overwrite: boolean
    """

    @apply_defaults
    def __init__(self,
                 schema,
                 table,
                 redshift_conn_id,
                 input_check,
                 main_sql,
                 output_check,
                 overwrite,
                 params={},
                 pre_sql="",
                 after_sql="",
                 attributes="",
                 *args,
                 **kwargs
                 ):
        self.schema = schema
        self.table = table
        self.redshift_conn_id = redshift_conn_id
        self.input_check = input_check
        self.main_sql = main_sql
        self.output_check = output_check

        # compose temp table creation, insert into the temp table as params
        if pre_sql:
            main_sql = pre_sql
            if not main_sql.endswith(";"):
                main_sql += ";"
        else:
            main_sql = ""

        main_sql += "DROP TABLE IF EXISTS {schema}.temp_{table};".format(
            schema=self.schema,
            table=self.table
        )
        # now we are using "CREATE TABLE ... AS SELECT" syntax
        # we used to create a temp table with the same schema as the main table and then insert into the temp table 
        main_sql += "CREATE TABLE {schema}.temp_{table} {attributes} AS ".format(
            schema=self.schema,
            table=self.table,
            attributes=attributes
        ) + self.main_sql

        if after_sql:
            self.after_sql = after_sql.format(
                schema=self.schema,
                table=self.table
            )
        else:
            self.after_sql = ""

        super(RedshiftSummaryOperator, self).__init__(
            python_callable=redshift_sql_function,
            params={
                "sql": main_sql,
                "overwrite": overwrite,
                "redshift_conn_id": self.redshift_conn_id
            },
            provide_context=True,
            *args,
            **kwargs
        )

    def swap(self):
        sql = """BEGIN;
        DROP TABLE IF EXISTS {schema}.{table} CASCADE;
        ALTER TABLE {schema}.temp_{table} RENAME TO {table};   
        GRANT SELECT ON TABLE {schema}.{table} TO GROUP analytics_users;
        END
        """.format(schema=self.schema,table=self.table)
        self.hook.run(sql, True)

    def execute(self, context):
        """Do input_check first
        - input_check should be a list of dictionaries
        - each item in the dictionary contains "sql" and "count"
        """
        self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
        for item in self.input_check:
            (cnt,) = self.hook.get_first(item["sql"])
            if cnt < item["count"]:
                raise AirflowException(
                    "Input Validation Failed for " + str(item["sql"]))

        """
        - create a temp table using create table like
        - run insert into the temp table
        """
        return_value = super(RedshiftSummaryOperator, self).execute(context)

        """Do output_check using self.output_check
        """
        for item in self.output_check:
            (cnt,) = self.hook.get_first(item["sql"].format(schema=self.schema, table=self.table))
            if item.get("op") == 'eq':
                if int(cnt) != int(item["count"]):
                    raise AirflowException(
                        "Output Validation of 'eq' Failed for " + str(item["sql"]) + ": " + str(cnt) + " vs. " + str(item["count"])
                    )
            else:
                if cnt < item["count"]:
                    raise AirflowException(
                        "Output Validation Failed for " + str(item["sql"]) + ": " + str(cnt) + " vs. " + str(item["count"])
                    )
        """Now swap the temp table name
        """
        self.swap()

        if self.after_sql:
            self.hook.run(self.after_sql, True)

        return return_value



SQL_to_sheet.py - S3에서 구글시트로 복사

from airflow import DAG
from airflow.operators.python import PythonOperator

from plugins import gsheet
from datetime import datetime

def update_gsheet(**context):
    sql = context["params"]["sql"]
    sheetfilename = context["params"]["sheetfilename"]
    sheetgid = context["params"]["sheetgid"]

    gsheet.update_sheet(sheetfilename, sheetgid, sql, "redshift_dev_db")


with DAG(
    dag_id = 'SQL_to_Sheet',
    start_date = datetime(2022,6,18),
    catchup=False,
    tags=['example'],
    schedule = '@once'
) as dag:

    sheet_update = PythonOperator(
        dag=dag,
        task_id='update_sql_to_sheet1',
        python_callable=update_gsheet,
        params = {
            "sql": "SELECT date, nps FROM analytics.nps_summary order by date",
            "sheetfilename": "spreadsheet-copy-testing",
            "sheetgid": "RedshiftToSheet"
        }
    )



Airflow API 활성화
- airflow.cfg의 api 섹션에서 auth_backend의 값을 변경
      [api]
    - auth_backend = airflow.api.auth.backend.basic_auth
- docker-compose.yaml에는 이미 설정이 되어 있음 (environments)
    - AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'

Airflow API의 사용예
/health API 호출 : curl -X GET --user "monitor:MonitorUser1" http://localhost:8080/health

1. 특정 DAG를 Trigger하기(ex)
    curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d '{"execution_date":"2023-05-24T00:00:00Z"}' "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"

2. 모든 DAG를 리스트하기
    curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/dags

3. 모든 Variable를 리스트하기
    curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/variables

4. 모든 Config를 리스트하기 - 이것은 기본적으로 막혀있다.(404에러 발생)
    curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config

variables, connections 정보를 json파일로 저장
airflow variables export variables.json
airflow connections export connections.json

json으로 저장된 variables, connections 정보를 airflow저장
airflow variables import variables.json
airflow connections import connections.json