Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- beuatifulsoup
- AWS
- 데이터 시각화
- SQL
- Kafka
- Selenium
- 데이터 엔지니어
- 팀 프로젝트
- django
- Snowflake
- Spark
- 코딩 테스트
- 슈퍼셋
- Til
- Tableau
- HTML
- 코딩테스트
- airflow
- 코테 연습
- 데브코스
- superset
- cloud platform
- PCCP
Archives
- Today
- Total
주니어 데이터 엔지니어 우솨's 개발일지
데이터 엔지니어링 75일차 TIL 본문
학습 내용
스파크를 활용한 타이타닉 생존자 예측모델 생성
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Titanic Classification via ML Pipeline and Model Selection") \
.getOrCreate()
"""# 타이타닉 생존 예측 모델 만들기
"""
spark
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/titanic.csv
!ls -tl
data = spark.read.csv('./titanic.csv', header=True, inferSchema=True)
data.printSchema()
data.show()
data.select(['*']).describe().show()
"""**데이터 클린업**:
* PassengerID, Name, Ticket, Embarked는 사용하지 않을 예정 (아무 의미가 없음).
* Cabin도 비어있는 값이 너무 많아서 사용하지 않을 예정
* Age는 중요한 정보인데 비어있는 레코드들이 많아서 디폴트값을 채워줄 예정
* 모든 필드를 MinMaxScaler로 스케일
* Gender의 경우 카테고리 정보이기에 숫자로 인코딩 필요
"""
final_data = data.select(['Survived', 'Pclass', 'Gender', 'Age', 'SibSp', 'Parch', 'Fare'])
final_data.show()
"""Age는 성별에 따른 평균값으로 채운다"""
from pyspark.sql.functions import col, when, avg
avg_ages = data.groupBy("Gender").agg(avg("Age").alias("avg_age"))
avg_ages.show()
# 성별에 따른 평균 나이를 딕셔너리로 변환
avg_age_dict = {row['Gender']: row['avg_age'] for row in avg_ages.collect()}
avg_age_dict
from pyspark.ml.feature import Imputer
# 데이터프레임에 평균 나이 값 추가
final_data = data.withColumn("AgeImputed",
when((col("Age").isNull()) & (col("Gender") == "male"), avg_age_dict['male'])
.when((col("Age").isNull()) & (col("Gender") == "female"), avg_age_dict['female'])
.otherwise(col("Age")))
final_data.show()
# age가 null값인 항목 확인 및 성별에 따른 AgeImputed확인
data_age_null = final_data.filter(col("Age").isNull())
# 결과 확인
data_age_null.show()
"""성별 정보 인코딩: male -> 0, female -> 1"""
from pyspark.ml.feature import StringIndexer
gender_indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndexed')
gender_indexer_model = gender_indexer.fit(final_data)
final_data = gender_indexer_model.transform(final_data)
final_data.select("Gender", "GenderIndexed").show()
"""## 피쳐 벡터를 만들기"""
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Pclass', 'SibSp', 'Parch', 'Fare', 'AgeImputed', 'GenderIndexed'], outputCol='features')
data_vec = assembler.transform(final_data)
data_vec.show()
"""Age와 Fare의 값을 스케일하는 것이 주요 목표"""
from pyspark.ml.feature import MinMaxScaler
age_scaler = MinMaxScaler(inputCol="features", outputCol="features_scaled")
age_scaler_model = age_scaler.fit(data_vec)
data_vec = age_scaler_model.transform(data_vec)
data_vec.select("features", "features_scaled").show()
"""## 훈련용과 테스트용 데이터를 나누고 binary classification 모델을 하나 만든다"""
train, test = data_vec.randomSplit([0.7, 0.3])
from pyspark.ml.classification import LogisticRegression
algo = LogisticRegression(featuresCol="features_scaled", labelCol="Survived")
model = algo.fit(train)
"""## 모델 성능 측정"""
predictions = model.transform(test)
predictions.groupby(['Survived']).count().collect()
predictions.groupby(['prediction']).count().collect()
predictions.select(['Survived','prediction', 'probability']).show()
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='Survived', metricName='areaUnderROC')
evaluator.evaluate(predictions)
import matplotlib.pyplot as plt
plt.figure(figsize=(5,5))
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(model.summary.roc.select('FPR').collect(),
model.summary.roc.select('TPR').collect())
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.show()
"""ML Pipeline 만들기"""
from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler, MinMaxScaler
# Gender
stringIndexer = StringIndexer(inputCol = "Gender", outputCol = 'GenderIndexed')
# Age
imputer = Imputer(strategy='mean', inputCols=['Age'], outputCols=['AgeImputed'])
# Vectorize
inputCols = ['Pclass', 'SibSp', 'Parch', 'Fare', 'AgeImputed', 'GenderIndexed']
assembler = VectorAssembler(inputCols=inputCols, outputCol="features")
# MinMaxScaler
minmax_scaler = MinMaxScaler(inputCol="features", outputCol="features_scaled")
stages = [stringIndexer, imputer, assembler, minmax_scaler]
from pyspark.ml.classification import LogisticRegression
algo = LogisticRegression(featuresCol="features_scaled", labelCol="Survived")
lr_stages = stages + [algo]
lr_stages
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = lr_stages)
df = data.select(['Survived', 'Pclass', 'Gender', 'Age', 'SibSp', 'Parch', 'Fare'])
df.show()
train, test = df.randomSplit([0.7, 0.3])
lr_model = pipeline.fit(train)
lr_cv_predictions = lr_model.transform(test)
evaluator.evaluate(lr_cv_predictions)
"""ML Tuning"""
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='Survived', metricName='areaUnderROC')
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
.addGrid(algo.maxIter, [1, 5, 10])
.build())
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5
)
# Run cross validations.
cvModel = cv.fit(train)
lr_cv_predictions = cvModel.transform(test)
evaluator.evaluate(lr_cv_predictions)
lr_cv_predictions.select("prediction", "survived").show()
import pandas as pd
params = [{p.name: v for p, v in m.items()} for m in cvModel.getEstimatorParamMaps()]
pd.DataFrame.from_dict([
{cvModel.getEvaluator().getMetricName(): metric, **ps}
for ps, metric in zip(params, cvModel.avgMetrics)
])
"""GBT Classifier"""
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(featuresCol="features_scaled", labelCol="Survived")
gbt_stages = stages + [gbt]
gbt_stages
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = gbt_stages)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
.addGrid(gbt.maxDepth, [2, 4, 6])
.addGrid(gbt.maxBins, [20, 60])
.addGrid(gbt.maxIter, [10, 20])
.build())
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5
)
# Run cross validations.
cvModel = cv.fit(train)
lr_cv_predictions = cvModel.transform(test)
evaluator.evaluate(lr_cv_predictions)
'데브코스' 카테고리의 다른 글
데이터 엔지니어링 74일차 TIL (0) | 2024.07.11 |
---|---|
데이터 엔지니어링 73일차 TIL (1) | 2024.07.10 |
데이터 엔지니어링 72일차 TIL (0) | 2024.07.09 |
데이터 엔지니어링 71일차 TIL (0) | 2024.07.08 |
데이터 엔지니어링 70일차 TIL (0) | 2024.07.06 |