일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- PCCP
- 데이터 시각화
- 코테 연습
- Tableau
- AWS
- 코딩테스트
- superset
- HTML
- beuatifulsoup
- Spark
- Til
- 데이터 엔지니어
- 데브코스
- airflow
- SQL
- 슈퍼셋
- Snowflake
- VPC
- 코딩 테스트
- Selenium
- GCP
- cloud platform
- django
- Kafka
- 팀 프로젝트
- Today
- Total
주니어 데이터 엔지니어 우솨's 개발일지
데이터 엔지니어링 57일차 TIL 본문
학습 내용
Spark 데이터 시스템 아키텍처
데이터 병렬처리가 가능하려면?
1. 데이터가 먼저 분산되어야 한다
- 하둡 맵의 데이터 처리 단위는 디스크에 있는 데이터 블록(128MB)
- hdfs-site.xml에 있는 dfs.block.size 프로퍼티가 결정
- Spark에서는 이를 파티션(Partition)이라 부른다(128MB)
- spark.sql.files.maxPartitionBytes : HDFS등에 있는 파일을 읽어올 때만 적용됨
2. 나눠진 데이터를 각각 따로 동시 처리
- 맵리듀스에서 N개의 데이터 블록으로 구성된 파일 처리시 N개의 Map 태스크가 실행
- Spark에서는 파티션 단위로 메모리가 로드되어 Executor가 배정된다.
데이터를 나누기 -> 파티션 ->병렬처리
Spark 데이터 처리 흐름
- 데이터프레임은 작은 파티션들로 구성됨
- 데이터프레임은 한번 만들어지면 수정 불가
- 입력 데이터프레임을 원하는 결과 도출까지 다른 데이터 프레임으로 계속 변환
- sort, group by, filter, map, join 등
셔플링 : 파티션간에 데이터 이동이 필요한 경우 발생
- 셔플링이 발생하는 경우
- 명시적 파티션을 새롭게 하는 경우(파티션 수를 줄이기)
- 시스템에 의해 이뤄지는 셔플링
- ex) 그룹핑 등의 aggregation 또는 sorting
- 셔플링이 발생할 때 네트워크를 타고 데이터가 이동하게 됨
- 기본값은 200이며 최대 파티션 수 이다.
- spark.sql.shuffle.partitions이 결정한다.
- 오퍼레이션에 따라 파티션 수가 결정된다.
- random, hashing partition, range partition 등
- sorting의 경우 range partition을 사용
Spark 데이터 구조
- RDD(Resilient Distributed Dataset)
- 로우레벨 데이터로 클러스터내의 서버에 분산된 데이터를 지칭
- 레코드별로 존재하지만 스키마가 존재하지 않는다.
- 구조화된 데이터나 비구조화된 데이터 모두 지원
-Dataframe과 Dataset
- RDD 위에 만들어지며 RDD와는 달리 필드 정보(테이블)를 가지고 있다.
- Dataset은 타입정보가 존재하며 컴파일 언어에서 사용가능하다
- python : DataFrame 사용
- Scala / Java : Dataset 사용
RDD(Resilient Distributed Dataset)
- 변경이 불가능한 분산 저장된 데이터
- 다수의 파티션으로 구성
- 로우레벨의 함수형 변환지원(map, filter, flatMap 등)
- 일반 파이썬 데이터는 parallelize 함수로 RDD로 변환
- 반대의 경우는 collect로 파이썬 데이터로 변환가능하다
DataFrame
- 변경이 불가한 분산 저장된 데이터
- RDD와 다르게 관계형 데이터베이스 테이블처럼 컬럼으로 나눠 저장
- 판다스의 데이터 프레임 혹은 관계형 데이터베이스의 테이블과 거의 흡사
- 다양한 데이터소스 지원 : HDFS, Hive, 외부데이터베이스, RDD등
- 스칼라, 자바, 파이썬과 같은 언어에서 지원
Spark Session 생성
- Spark 프로그램의 시작은 Spark Session을 만드는것
- 프로그램마다 하나를 만들어 Spark Cluster와 통신 : Singleton 객체
- Spark2.0에서 처음 소개됨
- Spark Session을 통해 Spark이 제공해주는 다양한 기능을 사용
- DataFrame, SQL, Streaming, ML API 모두 이 객체로 통신
- config 메소드를 이용해 다양한 환경설정 가능
- 단 RDD와 관련된 작업은 SparkSession 밑의 SparkContext객체를 사용
Spark Session 환경 변수
- Spark Session을 만들때 다양한 환경설정이 가능
- executor별 메모리 : spark.executor.memory(기본값 :1g)
- executor별 cpu수 : spark.executor.cores(YARN에서는 기본값 1)
- driver 메모리 : spark.driver.memory(기본값: 1g)
- Shuffle후 partition의 수 : spark.sql.shuffle.partitions (기본값:최대 200)
- 사용하는 Resource Manager에 따라 환경변수가 많이 달라진다.
Spark Session 환경설정방법 4가지
1. 환경변수
2. SPARK_HOME/conf/spark_defaults.conf
3. spark-submit 명령의 커맨드라인 파라미터
4. SparkSession 만들때 지정 - SparkConf
- Spark은 기본으로 in-memory 카탈로그를 사용.
-스토리지 기반의 카탈로그를 쓰고 싶다면 SparkSession 설정할 때 enableHiveSupport()를 호출 (Hive metastore를 카탈로그로 사용하며 Hive UDF와 Hive 파일포맷 사용 가능)
Spark Session이 지원하는 데이터 소스
- spark.read(DataFrameReader)를 사용하여 데이터프레임으로 로드
- spark.write(DataFrameWriter)을 사용하여 데이터프레임을 저장
- 많이 사용되는 데이터 소스들
- HDFS파일 - CSV, JSON, Parquet, ORC, Text, Avro
- JDBC 관계형 데이터베이스
- 클라우드 기반 데이터 시스템
- 스트리밍 시스템
spark 세션 만들기
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark DataFrame #2')\
.getOrCreate()
spark에서 헤더가 없는 csv파일 불러오기
# schema를 만들어 헤더를 직접 작성
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, FloatType
schema = StructType([ \
StructField("cust_id", StringType(), True), \
StructField("item_id", StringType(), True), \
StructField("amount_spent", FloatType(), True)])
df = spark.read.schema(schema).csv("customer-orders.csv")
spark에서 컬럼명 변경
1. withColumnRenamed 방법
df_ca = df.groupBy("cust_id").sum("amount_spent").withColumnRenamed("sum(amount_spent)", "sum")
2. functions 사용
import pyspark.sql.functions as f
df_ca = df.groupBy("cust_id") \
.agg(f.sum('amount_spent').alias('sum'))
Regex를 이용한 텍스트를 파싱
\S : non-whitespace character(공백,탭이 아닌 문자)
\d : numeric character
"+"가 붙으면 하나이상의 문자열을 가리킨다.
from pyspark.sql.functions import *
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'
df_with_new_columns = transfer_cost_df\
.withColumn('week', regexp_extract('text', regex_str, 1))\
.withColumn('departure_zipcode', regexp_extract(column('text'), regex_str, 2))\
.withColumn('arrival_zipcode', regexp_extract(transfer_cost_df.text, regex_str, 3))\
.withColumn('cost', regexp_extract(col('text'), regex_str, 4))\
.withColumn('vendor', regexp_extract(col('text'), regex_str, 5))
=> 0이 아니라 1부터 시작 !!
csv로 변환
final_df.write.csv("extracted.csv")
json파일로 변환
final_df.write.format("json").save("extracted.json")
import pyspark.sql.functions as F 함수
F.col : 컬럼을 가져옴
F.trim : 앞뒤 공백제거
F.split : 문자열 나누기 (split과 같음)
F.explode : 리스트였던 레코드를 개별 원소의 집합으로 늘리는 것
'데브코스' 카테고리의 다른 글
데이터 엔지니어링 59일차 TIL (0) | 2024.06.30 |
---|---|
데이터 엔지니어링 58일차 TIL (0) | 2024.06.30 |
데이터 엔지니어링 56일차 TIL (0) | 2024.06.17 |
데이터 엔지니어링 55일차 TIL (1) | 2024.06.07 |
데이터 엔지니어링 54일차 TIL (0) | 2024.06.06 |