주니어 데이터 엔지니어 우솨's 개발일지

데이터 엔지니어링 75일차 TIL 본문

데브코스

데이터 엔지니어링 75일차 TIL

우솨 2024. 7. 13. 12:38

학습 내용

스파크를 활용한 타이타닉 생존자 예측모델 생성

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Titanic Classification via ML Pipeline and Model Selection") \
    .getOrCreate()

"""# 타이타닉 생존 예측 모델 만들기



"""

spark

!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/titanic.csv

!ls -tl

data = spark.read.csv('./titanic.csv', header=True, inferSchema=True)

data.printSchema()

data.show()

data.select(['*']).describe().show()

"""**데이터 클린업**:

*   PassengerID, Name, Ticket, Embarked는 사용하지 않을 예정 (아무 의미가 없음).
*   Cabin도 비어있는 값이 너무 많아서 사용하지 않을 예정
*   Age는 중요한 정보인데 비어있는 레코드들이 많아서 디폴트값을 채워줄 예정
*   모든 필드를 MinMaxScaler로 스케일
*   Gender의 경우 카테고리 정보이기에 숫자로 인코딩 필요


"""

final_data = data.select(['Survived', 'Pclass', 'Gender', 'Age', 'SibSp', 'Parch', 'Fare'])

final_data.show()

"""Age는 성별에 따른 평균값으로 채운다"""

from pyspark.sql.functions import col, when, avg
avg_ages = data.groupBy("Gender").agg(avg("Age").alias("avg_age"))
avg_ages.show()

# 성별에 따른 평균 나이를 딕셔너리로 변환
avg_age_dict = {row['Gender']: row['avg_age'] for row in avg_ages.collect()}
avg_age_dict

from pyspark.ml.feature import Imputer

# 데이터프레임에 평균 나이 값 추가
final_data = data.withColumn("AgeImputed",
                   when((col("Age").isNull()) & (col("Gender") == "male"), avg_age_dict['male'])
                   .when((col("Age").isNull()) & (col("Gender") == "female"), avg_age_dict['female'])
                   .otherwise(col("Age")))

final_data.show()

# age가 null값인 항목 확인 및 성별에 따른 AgeImputed확인
data_age_null = final_data.filter(col("Age").isNull())

# 결과 확인
data_age_null.show()

"""성별 정보 인코딩: male -> 0, female -> 1"""

from pyspark.ml.feature import StringIndexer

gender_indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndexed')
gender_indexer_model = gender_indexer.fit(final_data)
final_data = gender_indexer_model.transform(final_data)

final_data.select("Gender", "GenderIndexed").show()

"""## 피쳐 벡터를 만들기"""

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['Pclass', 'SibSp', 'Parch', 'Fare', 'AgeImputed', 'GenderIndexed'], outputCol='features')
data_vec = assembler.transform(final_data)

data_vec.show()

"""Age와 Fare의 값을 스케일하는 것이 주요 목표"""

from pyspark.ml.feature import MinMaxScaler

age_scaler = MinMaxScaler(inputCol="features", outputCol="features_scaled")
age_scaler_model = age_scaler.fit(data_vec)
data_vec = age_scaler_model.transform(data_vec)

data_vec.select("features", "features_scaled").show()

"""## 훈련용과 테스트용 데이터를 나누고 binary classification 모델을 하나 만든다"""

train, test = data_vec.randomSplit([0.7, 0.3])

from pyspark.ml.classification import LogisticRegression

algo = LogisticRegression(featuresCol="features_scaled", labelCol="Survived")
model = algo.fit(train)

"""## 모델 성능 측정"""

predictions = model.transform(test)

predictions.groupby(['Survived']).count().collect()

predictions.groupby(['prediction']).count().collect()

predictions.select(['Survived','prediction', 'probability']).show()

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol='Survived', metricName='areaUnderROC')
evaluator.evaluate(predictions)

import matplotlib.pyplot as plt

plt.figure(figsize=(5,5))
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(model.summary.roc.select('FPR').collect(),
         model.summary.roc.select('TPR').collect())
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.show()

"""ML Pipeline 만들기"""

from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler, MinMaxScaler

# Gender
stringIndexer = StringIndexer(inputCol = "Gender", outputCol = 'GenderIndexed')

# Age
imputer = Imputer(strategy='mean', inputCols=['Age'], outputCols=['AgeImputed'])

# Vectorize
inputCols = ['Pclass', 'SibSp', 'Parch', 'Fare', 'AgeImputed', 'GenderIndexed']
assembler = VectorAssembler(inputCols=inputCols, outputCol="features")

# MinMaxScaler
minmax_scaler = MinMaxScaler(inputCol="features", outputCol="features_scaled")

stages = [stringIndexer, imputer, assembler, minmax_scaler]

from pyspark.ml.classification import LogisticRegression

algo = LogisticRegression(featuresCol="features_scaled", labelCol="Survived")
lr_stages = stages + [algo]

lr_stages

from pyspark.ml import Pipeline
pipeline = Pipeline(stages = lr_stages)

df = data.select(['Survived', 'Pclass', 'Gender', 'Age', 'SibSp', 'Parch', 'Fare'])
df.show()

train, test = df.randomSplit([0.7, 0.3])

lr_model = pipeline.fit(train)
lr_cv_predictions = lr_model.transform(test)
evaluator.evaluate(lr_cv_predictions)

"""ML Tuning"""

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol='Survived', metricName='areaUnderROC')

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(algo.maxIter, [1, 5, 10])
             .build())

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5
)

# Run cross validations.
cvModel = cv.fit(train)
lr_cv_predictions = cvModel.transform(test)
evaluator.evaluate(lr_cv_predictions)

lr_cv_predictions.select("prediction", "survived").show()

import pandas as pd

params = [{p.name: v for p, v in m.items()} for m in cvModel.getEstimatorParamMaps()]
pd.DataFrame.from_dict([
    {cvModel.getEvaluator().getMetricName(): metric, **ps}
    for ps, metric in zip(params, cvModel.avgMetrics)
])

"""GBT Classifier"""

from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(featuresCol="features_scaled", labelCol="Survived")
gbt_stages = stages + [gbt]

gbt_stages

from pyspark.ml import Pipeline
pipeline = Pipeline(stages = gbt_stages)

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 4, 6])
             .addGrid(gbt.maxBins, [20, 60])
             .addGrid(gbt.maxIter, [10, 20])
             .build())

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5
)

# Run cross validations.
cvModel = cv.fit(train)
lr_cv_predictions = cvModel.transform(test)
evaluator.evaluate(lr_cv_predictions)