머신러닝 배우기

[머신러닝] 전처리 및 모델 Pipeline 파이프라인 구축하기 (#파이썬)

차근차근 디지털 2024. 1. 2. 12:09

데이터셋을 받았을 때, EDA를 한 후 어떤 데이터가 어떤 타입인지 확인을 하고 그에 맞는 변환 처리를 해줍니다. 결측치처리, outlier처리, 스케일링, 원핫인코딩, 라벨인코딩 등의 과정을 거쳐야합니다. 데이터의 특성과 목적에 맞게 변환을 해주어야 올바른 모델 성능이 나옵니다. 그리고 보통의 데이터들은 문자 숫자가 섞여있고 깔끔하게 정제되어 있는 경우가 거의 없습니다. 그렇기 때문에 받을 때마다 전처리를 해야합니다. 또한 모델 구축할 때 알맞은 파라미터를 찾기 위해 그리드 서치 기법을 사용하면서 모델 구축 및 하이퍼파라미터 튜닝까지 파이프라인에 태울 수 있습니다. 

 

Pipeline은 데이터 전처리 및 모델 구축 과정에서 여러 단계를 순차적으로 처리하는 데 사용되는 유용한 도구입니다. 파이프라인을 사용하면 좋은 점은 코드가 간결해지며, 다른 데이터셋이나 환경에서 재사용성이 가능해지며, 반복적인 작업을 줄일 수 있어 생산성이 향상됩니다. 오늘은 데이터 타입에 따라 다른 파이프라인을 만들고 이를 통합한 전처리 파이프라인을 만들어보도록 하겠습니다. 또한 모델 구축 및 최적의 하이퍼파라미터를 찾는 그리드 서치 방법도 파이프라인에 녹이는 방법을 알려드리겠습니다. 

 

파이프라인을 구축하기 전에 사용할 예제 데이터에 따라 데이터 타입을 구분하고 어떻게 처리하고 변환해야하는지 알아보겠습니다. 

 

▶︎ 이 예제로 가져가야할 것

데이터 타입별로 어떤 전처리 방식을 사용하는지 알아야함
사용자 정의 함수도 파이프라인에 넣을 수 있음
전처리 과정을 한 번에 파이프라인으로 구축할 수 있고 모델 생성과 튜닝까지도 파이프라인에 넣을 수 있음
원핫 인코딩할 때, handle_unknown 처리해야함. 그래야 학습 데이터 이후 새로운 데이터가 들어갈 때 0으로 변환되어 데이터 입력 값이 변하지 않음.

pipeline을 사용하여 전처리를 하면 inverse_transform를 사용할 수 없음

 

 

1. 필요한 라이브러리 불러오기

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random

from sklearn import datasets, model_selection, linear_model
from sklearn.metrics import mean_squared_error

from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import FunctionTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler, OneHotEncoder, OrdinalEncoder

import warnings
warnings.filterwarnings("ignore")

 

2. 데이터셋 불러오기 (중고등학생들의 다음달 학원 등록 연장 여부에 따른 데이터를 임의로 만들어보았습니다.)

# 중고등학생들의 다음달 학원 등록 연장 여부에 따른 데이터를 임의로 만들어보았습니다.
df_train = pd.DataFrame({
                '성별':['남','남','여','남','여'],
                '혈액형':['A','B','O', 'AB','O'],
                '만족도':['좋음','보통','별로','좋음','좋음'],
                '성적등급':['A','C','B','A','D'],
                '용돈':[7000,150000,100000,50000,40000],
                '수학점수':[80,70,65,None,100],
                '영어점수':random.sample(range(101), 5),
                '등록연장여부':[1,0,0,1,1]})

df_test = pd.DataFrame({
                '성별':['남','여','남','남','여'],
                '혈액형':['A','B', 'AB','O','O'],
                '만족도':['좋음','보통','보통','별로','좋음'],
                '성적등급':['F','A','C','B','A'],
                '용돈':[10000,20000,1000000,None,70000],
                '수학점수':[80,70,65,30,100],
                '영어점수':random.sample(range(101), 5),
                '등록연장여부':[1,1,0,1,1]})

 

3. 데이터 타입에 따라 열 분류하기

nominal_features = ['성별','혈액형']
ordinal_features = ['만족도','성적등급']
discrete_features = ['수학점수','영어점수']
continuous_features = ['용돈']

 

4. 명목형 변수 전처리 파이프라인 구축

# nominal_features
nom_imputer = SimpleImputer(strategy='most_frequent')
        
onehot_encoder = OneHotEncoder(sparse=False, handle_unknown='ignore')

# 파이프라인 구축
nom_pipeline = Pipeline(
    steps=[
        ('nom_imputer', nom_imputer),
        ('onehot_encoder', onehot_encoder),
    ])

 

5. 순서형 변수 전처리 파이프라인 구축

# ordinal_features
ordinal_encoder = OrdinalEncoder(categories=[['별로', '보통', '좋음'], ['F','D','C', 'B', 'A']])

# 평균값으로 Nan값 채워주기
ord_imputer = SimpleImputer(strategy='mean')
        
# 파이프라인 구축
ord_pipeline = Pipeline(
    steps=[
        ('ordinal_encoder', ordinal_encoder),
        ('ord_imputer', ord_imputer),
    ])

 

6. 이산형 변수 전처리 파이프라인 구축

# discrete_features
disc_imputer = SimpleImputer(strategy='mean')

# 스케일링
disc_minmax_scaler = MinMaxScaler()
        
# 파이프라인 구축
disc_pipeline = Pipeline(
    steps=[
        ('disc_imputer', disc_imputer),
        ('disc_minmax_scaler', disc_minmax_scaler),
    ])

 

7. 연속형 변수 전처리 파이프라인 구축 (사용자 정의 함수도 파이프라인에 적용 가능)

# continuous_features
# 평균값으로 Nan값 채워주기
cont_imputer = SimpleImputer(strategy='mean')

# 예시: 데이터에 로그 스케일 적용
import numpy as np

def LogScaler(allowance):
    if isinstance(allowance, (int, float)):  # Check if 'allowance' is a single number
        if allowance < 1 or allowance is None:
            return 0
        else :
            return np.log(allowance)
    elif isinstance(allowance, np.ndarray):  # Check if 'allowance' is a NumPy array
        return np.log(np.maximum(allowance, 0))
    else:
        raise ValueError("Unsupported input type. 'allowance' should be a number or a NumPy array.")

# Define the custom transformer using FunctionTransformer
cont_log_scaler = FunctionTransformer(func=LogScaler, validate=False)
      
# 파이프라인 구축
cont_pipeline = Pipeline(
    steps=[
        ('cont_imputer', cont_imputer),
        ('cont_log_scaler', cont_log_scaler),
    ])

 

8. 파이프라인 합치기

preprocessor = ColumnTransformer(
    transformers=[
        ('nominal', nom_pipeline, nominal_features),
        ('ordinal', ord_pipeline, ordinal_features),
        ('discrete', disc_pipeline, discrete_features),
        ('continuous', cont_pipeline, continuous_features)
    ],
    remainder='passthrough')

 

9. 파이프라인 활용하여 데이터 학습 및 적용하기

train_transformed = preprocess.fit_transform(df_train)
test_transformed = preprocess.transform(df_test)

df_train_transformed = pd.DataFrame(train_transformed)
df_test_transformed = pd.DataFrame(test_transformed)

 

10. 모델에 적용하여 예측하기

from sklearn.ensemble import GradientBoostingClassifier

model = GradientBoostingClassifier(n_estimators=200, random_state=0)
model.fit(df_train_transformed.iloc[:, :-1], df_train_transformed[11]) # <- x_train_transformed (not x_train)

accuracy = model.score(df_test_transformed.iloc[:, :-1], df_test_transformed[11])
print("model score:", round(accuracy, 4))

 

이렇게 전처리 파이프라인 만드는 것이 끝났습니다. 이제 이 파이프라인을 통해 다른 전처리 작업을 할 때 뻔한 전처리 코드를 반복적으로 수행하지 않아도 되고 여러분들이 가져다쓰셔도 됩니다. 이렇게 전처리 전용 파이프라인을 만들면 모델과 독립적으로 적용되기 때문에 사용하고 싶은 모델을 직접 돌리시면 됩니다. 혹은 모델을 함께 파이프라인에 넣어 (전처리 + 모델 구축)까지 하나의 파이프라인으로 만들 수 있습니다. 하지만 보통 모델 한번에 완성본으로 내놓지 않죠. 여러 모델끼리 비교도 하고 최적의 하이퍼파라미터 조합도 찾습니다. 그래서 전처리 파이프라인만 따로 만들고, 다양한 모델과 하이퍼파라미터 조합을 통해 최고의 모델을 찾는 것을 추천합니다. 하지만 함께 파이프라인에 녹이는 법도 추가로 알려드리겠습니다. 

 

▶︎ 전처리 + 모델 구축

# (전처리+모델구축) 한 번에 하고 싶으면 요렇게
from sklearn.ensemble import GradientBoostingClassifier

model = Pipeline(steps=[('preprocessor', preprocessor),
                        ('classifier', GradientBoostingClassifier(n_estimators=200, random_state=0))])

model.fit(df_train.iloc[:,:-1], df_train.iloc[:,-1]) # <- x_train (not x_train_transformed)

accuracy = model.score(df_test.iloc[:,:-1], df_test.iloc[:,-1])
print("model score:", round(accuracy, 4))



▶︎ (전처리 + 모델 구축 + 최적의 하이퍼파라미터 찾기) 까지 한 번에!!!

from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import GridSearchCV

model = Pipeline(steps=[('preprocessor', preprocessor),
                        ('classifier', GradientBoostingClassifier())]) # removed hyper-params 

# model.get_params().keys() # 이거 실행해서 하이퍼파라미터 종류 찾고

param_grid = {
    'classifier__loss': ['deviance', 'exponential'],
    'classifier__learning_rate': [0.01, 0.001],
    'classifier__n_estimators': [200, 300, 400], 
    'classifier__min_samples_split': [2, 4],
    'classifier__max_depth': [2, 4],
    'classifier__random_state': [0]
}

grid_search = GridSearchCV(model, param_grid, 
                           refit=True, cv=3, n_jobs=1, verbose=1, scoring= 'accuracy')

grid_search.fit(df_train.iloc[:,:-1], df_train.iloc[:,-1])
print("Best params:", grid_search.best_params_)

accuracy = grid_search.score(df_test.iloc[:,:-1], df_test.iloc[:,-1])
print("\nmodel score:", round(accuracy, 4))

# Result
# Fitting 3 folds for each of 32 candidates, totalling 96 fits
# Best params: {'classifier__learning_rate': 0.01, 'classifier__loss': 'exponential', 'classifier__max_depth': 2, 'classifier__min_samples_split': 2, 'classifier__n_estimators': 200, 'classifier__random_state': 0}

# model score: 0.6
# 찾은 최적 조합 넣어서 모델 생성 classifier__ 삭제하고 넣기!
model = Pipeline(steps=[('preprocessor', preprocessor),
                        ('classifier', GradientBoostingClassifier(learning_rate=0.01, 
                                                                  loss='exponential', 
                                                                  max_depth=2, 
                                                                  min_samples_split=2, 
                                                                  n_estimators=200, 
                                                                  random_state=0))])
model.fit(df_train.iloc[:,:-1], df_train.iloc[:,-1])

accuracy = model.score(df_test.iloc[:,:-1], df_test.iloc[:,-1])
print("model score:", round(accuracy, 4))