주니어 데이터 엔지니어 우솨's 개발일지

데이터 엔지니어링 57일차 TIL 본문

데브코스

데이터 엔지니어링 57일차 TIL

우솨 2024. 6. 30. 09:59

학습 내용

 

Spark 데이터 시스템 아키텍처



데이터 병렬처리가 가능하려면?
1. 데이터가 먼저 분산되어야 한다
        - 하둡 맵의 데이터 처리 단위는 디스크에 있는 데이터 블록(128MB)
                - hdfs-site.xml에 있는 dfs.block.size 프로퍼티가 결정
        - Spark에서는 이를 파티션(Partition)이라 부른다(128MB)
                - spark.sql.files.maxPartitionBytes : HDFS등에 있는 파일을 읽어올 때만 적용됨
2. 나눠진 데이터를 각각 따로 동시 처리
        - 맵리듀스에서 N개의 데이터 블록으로 구성된 파일 처리시 N개의 Map 태스크가 실행
        - Spark에서는 파티션 단위로 메모리가 로드되어 Executor가 배정된다.

데이터를 나누기 -> 파티션 ->병렬처리

 


Spark 데이터 처리 흐름
- 데이터프레임은 작은 파티션들로 구성됨
        - 데이터프레임은 한번 만들어지면 수정 불가
- 입력 데이터프레임을 원하는 결과 도출까지 다른 데이터 프레임으로 계속 변환
        - sort, group by, filter, map, join 등

셔플링 : 파티션간에 데이터 이동이 필요한 경우 발생
- 셔플링이 발생하는 경우
        - 명시적 파티션을 새롭게 하는 경우(파티션 수를 줄이기)
        - 시스템에 의해 이뤄지는 셔플링
                - ex) 그룹핑 등의 aggregation 또는 sorting
- 셔플링이 발생할 때 네트워크를 타고 데이터가 이동하게 됨
        - 기본값은 200이며 최대 파티션 수 이다.
        - spark.sql.shuffle.partitions이 결정한다.
        - 오퍼레이션에 따라 파티션 수가 결정된다.
                - random, hashing partition, range partition 등
                - sorting의 경우 range partition을 사용

Spark 데이터 구조
- RDD(Resilient Distributed Dataset)
        - 로우레벨 데이터로 클러스터내의 서버에 분산된 데이터를 지칭
        - 레코드별로 존재하지만 스키마가 존재하지 않는다.
                - 구조화된 데이터나 비구조화된 데이터 모두 지원
-Dataframe과 Dataset
        - RDD 위에 만들어지며 RDD와는 달리 필드 정보(테이블)를 가지고 있다.
        - Dataset은 타입정보가 존재하며 컴파일 언어에서 사용가능하다
- python : DataFrame 사용
- Scala / Java : Dataset 사용



RDD(Resilient Distributed Dataset)
- 변경이 불가능한 분산 저장된 데이터
        - 다수의 파티션으로 구성
        - 로우레벨의 함수형 변환지원(map, filter, flatMap 등)
- 일반 파이썬 데이터는 parallelize 함수로 RDD로 변환
        - 반대의 경우는 collect로 파이썬 데이터로 변환가능하다

DataFrame
- 변경이 불가한 분산 저장된 데이터
- RDD와 다르게 관계형 데이터베이스 테이블처럼 컬럼으로 나눠 저장
        - 판다스의 데이터 프레임 혹은 관계형 데이터베이스의 테이블과 거의 흡사
        - 다양한 데이터소스 지원 : HDFS, Hive, 외부데이터베이스, RDD등
- 스칼라, 자바, 파이썬과 같은 언어에서 지원

Spark Session 생성
- Spark 프로그램의 시작은 Spark Session을 만드는것
        - 프로그램마다 하나를 만들어 Spark Cluster와 통신 : Singleton 객체
        - Spark2.0에서 처음 소개됨
- Spark Session을 통해 Spark이 제공해주는 다양한 기능을 사용
        - DataFrame, SQL, Streaming, ML API 모두 이 객체로 통신
        - config 메소드를 이용해 다양한 환경설정 가능
        - 단 RDD와 관련된 작업은 SparkSession 밑의 SparkContext객체를 사용

Spark Session 환경 변수
- Spark Session을 만들때 다양한 환경설정이 가능
        - executor별 메모리 : spark.executor.memory(기본값 :1g)
        - executor별 cpu수 : spark.executor.cores(YARN에서는 기본값 1)
        - driver 메모리 : spark.driver.memory(기본값: 1g)
        - Shuffle후 partition의 수 : spark.sql.shuffle.partitions (기본값:최대 200)
        - 사용하는 Resource Manager에 따라 환경변수가 많이 달라진다.

Spark Session 환경설정방법 4가지
1. 환경변수
2. SPARK_HOME/conf/spark_defaults.conf
3. spark-submit 명령의 커맨드라인 파라미터
4. SparkSession 만들때 지정 - SparkConf
- Spark은 기본으로 in-memory 카탈로그를 사용.
-스토리지 기반의 카탈로그를 쓰고 싶다면 SparkSession 설정할 때 enableHiveSupport()를 호출 (Hive metastore를 카탈로그로 사용하며 Hive UDF와 Hive 파일포맷 사용 가능)

Spark Session이 지원하는 데이터 소스
- spark.read(DataFrameReader)를 사용하여 데이터프레임으로 로드
- spark.write(DataFrameWriter)을 사용하여 데이터프레임을 저장
- 많이 사용되는 데이터 소스들
        - HDFS파일 - CSV, JSON, Parquet, ORC, Text, Avro
        - JDBC 관계형 데이터베이스
        - 클라우드 기반 데이터 시스템
        - 스트리밍 시스템

spark 세션 만들기

from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark DataFrame #2')\
        .getOrCreate()



spark에서 헤더가 없는 csv파일 불러오기

# schema를 만들어 헤더를 직접 작성
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, FloatType

schema = StructType([ \
                     StructField("cust_id", StringType(), True), \
                     StructField("item_id", StringType(), True), \
                     StructField("amount_spent", FloatType(), True)])
df = spark.read.schema(schema).csv("customer-orders.csv")



spark에서 컬럼명 변경
1. withColumnRenamed 방법

df_ca = df.groupBy("cust_id").sum("amount_spent").withColumnRenamed("sum(amount_spent)", "sum")


2. functions 사용

import pyspark.sql.functions as f

df_ca = df.groupBy("cust_id") \
   .agg(f.sum('amount_spent').alias('sum'))



Regex를 이용한 텍스트를 파싱
\S : non-whitespace character(공백,탭이 아닌 문자)
\d : numeric character
"+"가 붙으면 하나이상의 문자열을 가리킨다.

from pyspark.sql.functions import *
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'

df_with_new_columns = transfer_cost_df\
    .withColumn('week', regexp_extract('text', regex_str, 1))\
    .withColumn('departure_zipcode', regexp_extract(column('text'), regex_str, 2))\
    .withColumn('arrival_zipcode', regexp_extract(transfer_cost_df.text, regex_str, 3))\
    .withColumn('cost', regexp_extract(col('text'), regex_str, 4))\
    .withColumn('vendor', regexp_extract(col('text'), regex_str, 5))


=> 0이 아니라 1부터 시작 !!

csv로 변환

final_df.write.csv("extracted.csv")


json파일로 변환

final_df.write.format("json").save("extracted.json")


import pyspark.sql.functions as F 함수
F.col : 컬럼을 가져옴
F.trim : 앞뒤 공백제거
F.split : 문자열 나누기 (split과 같음)
F.explode : 리스트였던 레코드를 개별 원소의 집합으로 늘리는 것