Scikit Learn の Pipeline 周りまとめ - you1025/my_something_flagments GitHub Wiki

概要

scikit-learn で一連の処理を定義・実行するための Pipeline の使い方と簡単な例を記載する。
また scikit-learn に準拠した自作の処理を作成し Pipeline に追加して用いるための方法についても触れる。

データの準備

scikit-learn の乳がん診断データを用いる。

import pandas as pd
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

# データの読み込み
breast_cancer = load_breast_cancer()
x = pd.DataFrame(data=breast_cancer.data, columns=breast_cancer.feature_names)
y = pd.Series(breast_cancer.target)

# 訓練とテスト用のデータに分割
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=1025)

Pipeline の作成

多段に処理をつなげた Pipeline を作成する事で一連の処理を実行でき、作成された Pipeline は scikit-learn の Estimator として利用できる。

Pipeline の作成には下記 2 通りの方法があり、それぞれについて記載する。

sklearn.pipeline.Pipeline

steps 引数にタプルで処理を追加する事で Pipeline を生成する。
タプルの 1 番目に処理の内容が分かるような名前を記載し、これを用いて Pipeline 中の各処理にアクセスする(ex. GridSearchCV)。

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler

# Pipeline の定義
Pipeline(steps=[
    ("scaler_1", StandardScaler()),
    ("scaler_2", MinMaxScaler())
])
# Pipeline(steps=[('scaler_1', StandardScaler()), ('scaler_2', MinMaxScaler())])

同一 Pipeline に対して処理の名称が重複すると例外が発生する事に注意。

# ValueError 例外が発生する
Pipeline(steps=[
    ("scaler", StandardScaler()),
    ("scaler", MinMaxScaler())
])
# ValueError: Names provided are not unique: ['scaler', 'scaler']

sklearn.pipeline.make_pipeline

引数に一連の処理を指定する事で Pipeline が生成される。
前述した Pipeline とどちらを用いても良いが make_pipeline の場合は処理に名前を付けなくても良く、処理クラス名の小文字が自動的に付与される。
同じ処理が含まれる場合は名称の後ろに連番が自動的に追加される事に注意。

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler

# Pipeline の定義
make_pipeline(
    StandardScaler(),
    MinMaxScaler(),
    MinMaxScaler()
)
# Pipeline(steps=[('standardscaler', StandardScaler()),
#                 ('minmaxscaler-1', MinMaxScaler()),
#                 ('minmaxscaler-2', MinMaxScaler())])

使用例

例1: 主成分分析

主成分分析の前段階としてデータをスケーリングする。

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

# Pipeline の定義
# 標準化 -> PCA の流れ
pipe = Pipeline(steps=[
    ("scaler", StandardScaler()),
    ("pca", PCA(n_components=2))
])

# データ変換の実行
pipe.fit_transform(x_train)
# array([[-2.39384883, -1.54638694],
#        [-1.02870648,  0.83458927],
#        [-3.56214797, -0.55887146], ...

例2: Support Vector Machine

Support Vector Machine の学習に用いるデータの前処理としてスケーリングを実施。

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC

# Pipeline の定義
# 標準化 -> 識別器 の流れ
pipe = Pipeline(steps=[
    ("scaler", StandardScaler()),
    ("svc", SVC())
])

# 学習の実行
pipe.fit(x_train, y_train)

# 予測の実行
pipe.predict(x_test)
# array([0, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0,
#        0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 0,
#        0, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1,
#        1, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
#        1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1,
#        1, 1, 1, 1])

GridSearchCV との組み合わせ

前述したように Pipeline は scikit-learn の Estimator として利用できるので GridSearchCV と組み合わせる事が可能。
また Cross Validation の各 Validation 毎に一連の処理を実行する事でリークの解消にもつながる。

パラメータチューニング

SVM のハイパーパラメータ gamma をグリッドサーチで探索する。
param_grid で指定するキーは処理名とパラメータ名の間をアンダースコア 2 つでつないで [処理名]__[パラメータ名] とする事に注意。

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV

# Pipeline の定義
pipe = Pipeline(steps=[
    ("scaler", StandardScaler()),
    ("svc", SVC(C=100))
])

# gamma パラメータの選択肢を指定してグリッドサーチを定義
# 処理 "svc" の "gamma" パラメータを指定するので "svc__gamma" をキーにする
param_grid = {
    "svc__gamma": [0.001, 0.01, 0.1, 1, 10, 100]
}
grid = GridSearchCV(estimator=pipe, param_grid=param_grid, cv=5)

# パラメータごとに学習を実行
grid.fit(x_train, y_train)

# 選択された最良パラメータ
grid.best_params_
# {'svc__gamma': 0.001}

モデル選択

モデル(ex. SVC, RandomForest)や前処理(ex. StandardScaler, MinMaxScaler)の選択、もしくはその有無も指定できる。

今回の例では下記の組み合わせが選択される。

  • 前処理: MinMaxScaler
  • モデル: SVC
  • gamma: 0.01
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV

# Pipeline の定義
# グリッドサーチで処理を指定するのでここでは None としているが枠(scaler, classifier)だけは決めておく
pipe = Pipeline(steps=[
    ("scaler", None),
    ("classifier", None)
])

# SVM と RandomForest の両方のモデルからスコアの良い方を選択する
# SVM における前処理の選択肢に None(前処理なし) が指定できる事に注意
param_grid = [
    # SVM
    {
        "scaler": [None, StandardScaler(), MinMaxScaler()],
        "classifier": [SVC(C=100)],
        "classifier__gamma": [0.001, 0.01, 0.1, 1, 10, 100] # モデルの処理名である "classifier" を流用できる
    },

    # RandomForest
    {
        "scaler": [None],
        "classifier": [RandomForestClassifier()],
        "classifier__max_features": [1, 2, 3]
    }
]
grid = GridSearchCV(pipe, param_grid, cv=5, scoring="accuracy")

# 前処理/モデル/パラメータ 毎に学習を実行
grid.fit(x_train, y_train)

# 選択された最良の組み合わせ
grid.best_params_
# {'classifier': SVC(C=100, gamma=0.01),
#  'classifier__gamma': 0.01,
#  'scaler': MinMaxScaler()}

Pipeline で使える自作 Estimator の作成

sklearn.base.BaseEstimatorBase classes に含まれるクラス(MixIn)を継承させる事で自作の処理を作成可能。

例1: 交互作用特徴量の追加

総当り四則演算による特徴量を追加する処理クラス。

from sklearn.base import BaseEstimator, TransformerMixin
import featuretools as ft

class FourArithmeticOperationFeatures(BaseEstimator, TransformerMixin):
    def fit(self, x, y=None):
        return self

    def transform(self, x):
        x_copy = x.copy()

        # index 指定用のダミー項目を追加
        DUMMY_ID_NAME = "dummy_id"
        x_copy.insert(0, DUMMY_ID_NAME, range(x_copy.shape[0]))

        # 特徴量の追加
        es = ft.EntitySet(id="transformer")
        es.entity_from_dataframe(
            entity_id="x",
            dataframe=x_copy,
            index=DUMMY_ID_NAME
        )
        feature_matrix, _ = ft.dfs(
            entityset=es,
            target_entity="x",
            trans_primitives=["add_numeric", "subtract_numeric", "multiply_numeric", "divide_numeric"], # 四則演算ぜんぶ乗せ
            agg_primitives=[],
            max_depth=1
        )

        # index をオリジナルのものに変更
        feature_matrix.index = x.index

        return feature_matrix

変換の実施により 2205 列の DataFrame が得られる。

2205 列となる理由は下記。

  1. 30 列から 2 列を選択する組み合わせが 30 * 29 / 2 = 435 個
  2. 足し算, 引き算, 掛け算, 割り算x2(x/y と y/x の 2 パターン) により 5 つの演算
  3. 上記 1. 2. により 2205 = 435 * 5 + 30(オリジナルの列数)
faof = FourArithmeticOperationFeatures()
faof.fit_transform(x_train)
# 2205 列 の DataFrame が得られる

例2: LightGBM の重要度による特徴量選択

import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator
from sklearn.feature_selection import SelectorMixin
from sklearn.utils.validation import check_is_fitted
import lightgbm as lgb

class LgbmImportanceSelector(BaseEstimator, SelectorMixin):
    def __init__(self, importance_threshold, classifier=None):
        self.classifier = classifier
        self.importance_threshold = importance_threshold

    def fit(self, x, y):
        clf = self.classifier if self.classifier is not None else lgb.LGBMClassifier(objective="binary")
        clf.fit(x, y)

        # 元データの項目数
        self.n_features_ = x.shape[1]

        # 重要度が閾値以上となる項目の ID 一覧
        self.selected_indexes_ = pd.DataFrame(clf.feature_importances_, columns=["importance"]) \
                                    .sort_values(by="importance", ascending=False) \
                                    .query(f"importance >= {self.importance_threshold}") \
                                    .index \
                                    .values

        return self

    # DataFrame を返すように書き換え
    def transform(self, x):
        x_arr = super().transform(x)
        return pd.DataFrame(x_arr, columns=x.columns[self.selected_indexes_], index=x.index)

    def _get_support_mask(self):
        check_is_fitted(self, "selected_indexes_")

        # 選択された項目の ID に合致する箇所だけ True となる配列を mask として生成
        mask = np.array([ (column_index in self.selected_indexes_) for column_index in range(self.n_features_) ])

        return mask

特徴量の選択をして LightGBM でモデル作成。

from sklearn.pipeline import Pipeline
from sklearn.metrics import log_loss

pipe = Pipeline(steps=[
    ("feature_selection", LgbmImportanceSelector(importance_threshold=5)),
    ("classifier", lgb.LGBMClassifier(objective="binary"))
])
pipe.fit(x_train, y_train)
y_pred = pipe.predict(x_test)

logloss = log_loss(y_test, y_pred)
print(f"logloss: {logloss:.3f}")
# logloss: 1.515

参考