KR_Scikit - somaz94/python-study GitHub Wiki
Scikit-learn์ ํ์ด์ฌ์์ ๊ฐ์ฅ ๋๋ฆฌ ์ฌ์ฉ๋๋ ๋จธ์ ๋ฌ๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ก, ๋ค์ํ ์๊ณ ๋ฆฌ์ฆ, ์ ์ฒ๋ฆฌ ๋๊ตฌ, ๋ชจ๋ธ ํ๊ฐ ๊ธฐ๋ฒ์ ์ ๊ณตํ๋ฉฐ ์ผ๊ด๋ API๋ก ์ฝ๊ฒ ์ฌ์ฉํ ์ ์๋ค.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import load_iris, load_boston, fetch_california_housing
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from sklearn.pipeline import Pipeline
from typing import Tuple, Dict, List, Any, Optional, Union
class DatasetPreparation:
"""
๋จธ์ ๋ฌ๋ ๋ชจ๋ธ ํ์ต์ ์ํ ๋ฐ์ดํฐ์
์ค๋น ํด๋์ค
"""
def __init__(self, random_state: int = 42):
"""
์ด๊ธฐํ
Args:
random_state: ๋๋ค ์๋
"""
self.random_state = random_state
self.X_train = None
self.X_test = None
self.y_train = None
self.y_test = None
self.scaler = None
self.imputer = None
def load_dataset(self, dataset_name: str = 'iris') -> Tuple[np.ndarray, np.ndarray]:
"""
์ํ ๋ฐ์ดํฐ์
๋ก๋
Args:
dataset_name: ๋ฐ์ดํฐ์
์ด๋ฆ ('iris', 'boston', 'california')
Returns:
Tuple: (X, y) ํํ์ ํน์ฑ๊ณผ ํ๊ฒ ๋ฐ์ดํฐ
"""
if dataset_name == 'iris':
dataset = load_iris()
X, y = dataset.data, dataset.target
print(f"Iris ๋ฐ์ดํฐ์
๋ก๋: {X.shape[0]} ์ํ, {X.shape[1]} ํน์ฑ, {len(np.unique(y))} ํด๋์ค")
elif dataset_name == 'boston':
dataset = load_boston()
X, y = dataset.data, dataset.target
print(f"Boston ์ฃผํ ๋ฐ์ดํฐ์
๋ก๋: {X.shape[0]} ์ํ, {X.shape[1]} ํน์ฑ")
elif dataset_name == 'california':
dataset = fetch_california_housing()
X, y = dataset.data, dataset.target
print(f"California ์ฃผํ ๋ฐ์ดํฐ์
๋ก๋: {X.shape[0]} ์ํ, {X.shape[1]} ํน์ฑ")
else:
# ์ปค์คํ
๋ฐ์ดํฐ ์์ฑ (์์)
X = np.random.rand(100, 4) # 100๊ฐ ์ํ, 4๊ฐ ํน์ฑ
y = np.random.randint(0, 2, 100) # ์ด์ง ๋ถ๋ฅ๋ฅผ ์ํ ๋ ์ด๋ธ
print(f"๋๋ค ๋ฐ์ดํฐ ์์ฑ: {X.shape[0]} ์ํ, {X.shape[1]} ํน์ฑ")
return X, y
def split_data(self, X: np.ndarray, y: np.ndarray, test_size: float = 0.2) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
ํ์ต ๋ฐ ํ
์คํธ ๋ฐ์ดํฐ ๋ถํ
Args:
X: ํน์ฑ ๋ฐ์ดํฐ
y: ํ๊ฒ ๋ฐ์ดํฐ
test_size: ํ
์คํธ ์ธํธ ๋น์จ
Returns:
Tuple: (X_train, X_test, y_train, y_test) ํํ์ ๋ถํ ๋ ๋ฐ์ดํฐ
"""
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=self.random_state
)
self.X_train, self.X_test = X_train, X_test
self.y_train, self.y_test = y_train, y_test
print(f"๋ฐ์ดํฐ ๋ถํ : ํ์ต {X_train.shape[0]} ์ํ, ํ
์คํธ {X_test.shape[0]} ์ํ")
return X_train, X_test, y_train, y_test
def scale_data(self, scaler_type: str = 'standard') -> Tuple[np.ndarray, np.ndarray]:
"""
๋ฐ์ดํฐ ์ค์ผ์ผ๋ง
Args:
scaler_type: ์ค์ผ์ผ๋ฌ ์ ํ ('standard', 'minmax', 'robust')
Returns:
Tuple: (X_train_scaled, X_test_scaled) ํํ์ ์ค์ผ์ผ๋ง๋ ๋ฐ์ดํฐ
"""
if self.X_train is None or self.X_test is None:
raise ValueError("๋จผ์ split_data๋ฅผ ํธ์ถํ์ฌ ๋ฐ์ดํฐ๋ฅผ ๋ถํ ํด์ผ ํฉ๋๋ค")
if scaler_type == 'standard':
self.scaler = StandardScaler()
print("StandardScaler ์ ์ฉ: ํ๊ท =0, ํ์คํธ์ฐจ=1")
elif scaler_type == 'minmax':
self.scaler = MinMaxScaler()
print("MinMaxScaler ์ ์ฉ: ๋ฒ์=[0,1]")
elif scaler_type == 'robust':
self.scaler = RobustScaler()
print("RobustScaler ์ ์ฉ: ์ค์๊ฐ=0, IQR ๊ธฐ๋ฐ ์ค์ผ์ผ๋ง")
else:
raise ValueError("์ง์๋์ง ์๋ ์ค์ผ์ผ๋ฌ ์ ํ์
๋๋ค")
X_train_scaled = self.scaler.fit_transform(self.X_train)
X_test_scaled = self.scaler.transform(self.X_test)
return X_train_scaled, X_test_scaled
def handle_missing_values(self, X: np.ndarray, strategy: str = 'mean') -> np.ndarray:
"""
๊ฒฐ์ธก์น ์ฒ๋ฆฌ
Args:
X: ํน์ฑ ๋ฐ์ดํฐ
strategy: ๋์ฒด ์ ๋ต ('mean', 'median', 'most_frequent', 'constant')
Returns:
np.ndarray: ๊ฒฐ์ธก์น๊ฐ ์ฒ๋ฆฌ๋ ๋ฐ์ดํฐ
"""
self.imputer = SimpleImputer(strategy=strategy)
X_imputed = self.imputer.fit_transform(X)
print(f"๊ฒฐ์ธก์น ์ฒ๋ฆฌ: {strategy} ์ ๋ต ์ฌ์ฉ")
return X_imputed
def encode_categorical(self, X: np.ndarray, categorical_cols: List[int]) -> np.ndarray:
"""
๋ฒ์ฃผํ ๋ณ์ ์ธ์ฝ๋ฉ
Args:
X: ํน์ฑ ๋ฐ์ดํฐ
categorical_cols: ๋ฒ์ฃผํ ๋ณ์์ ์ธ๋ฑ์ค ๋ชฉ๋ก
Returns:
np.ndarray: ์ธ์ฝ๋ฉ๋ ๋ฐ์ดํฐ
"""
encoder = OneHotEncoder(sparse=False, drop='first')
# ๋ฒ์ฃผํ ๋ณ์ ์ถ์ถ
X_cat = X[:, categorical_cols]
# ์์นํ ๋ณ์ ์ถ์ถ
X_num = np.delete(X, categorical_cols, axis=1)
# ๋ฒ์ฃผํ ๋ณ์ ์ธ์ฝ๋ฉ
X_cat_encoded = encoder.fit_transform(X_cat)
# ์ธ์ฝ๋ฉ๋ ๋ฐ์ดํฐ์ ์์นํ ๋ฐ์ดํฐ ๊ฒฐํฉ
X_encoded = np.hstack([X_num, X_cat_encoded])
print(f"๋ฒ์ฃผํ ๋ณ์ ์ธ์ฝ๋ฉ: {len(categorical_cols)}๊ฐ ๋ณ์, ์-ํซ ์ธ์ฝ๋ฉ ์ ์ฉ")
return X_encoded
def create_pipeline(self, scaler_type: str = 'standard', impute_strategy: str = 'mean') -> Pipeline:
"""
์ ์ฒ๋ฆฌ ํ์ดํ๋ผ์ธ ์์ฑ
Args:
scaler_type: ์ค์ผ์ผ๋ฌ ์ ํ
impute_strategy: ๊ฒฐ์ธก์น ๋์ฒด ์ ๋ต
Returns:
Pipeline: Scikit-learn ์ ์ฒ๋ฆฌ ํ์ดํ๋ผ์ธ
"""
steps = []
# ๊ฒฐ์ธก์น ์ฒ๋ฆฌ ๋จ๊ณ ์ถ๊ฐ
steps.append(('imputer', SimpleImputer(strategy=impute_strategy)))
# ์ค์ผ์ผ๋ง ๋จ๊ณ ์ถ๊ฐ
if scaler_type == 'standard':
steps.append(('scaler', StandardScaler()))
elif scaler_type == 'minmax':
steps.append(('scaler', MinMaxScaler()))
elif scaler_type == 'robust':
steps.append(('scaler', RobustScaler()))
pipeline = Pipeline(steps)
print(f"์ ์ฒ๋ฆฌ ํ์ดํ๋ผ์ธ ์์ฑ: {' -> '.join([step[0] for step in steps])}")
return pipeline
def visualize_data(self, X: np.ndarray, y: np.ndarray, feature_names: Optional[List[str]] = None) -> None:
"""
๋ฐ์ดํฐ ์๊ฐํ
Args:
X: ํน์ฑ ๋ฐ์ดํฐ
y: ํ๊ฒ ๋ฐ์ดํฐ
feature_names: ํน์ฑ ์ด๋ฆ ๋ชฉ๋ก
"""
if feature_names is None:
feature_names = [f'Feature_{i}' for i in range(X.shape[1])]
# ํน์ฑ ๋ถํฌ ํ์ธ
plt.figure(figsize=(12, 8))
for i in range(min(X.shape[1], 8)): # ์ต๋ 8๊ฐ ํน์ฑ๊น์ง ํ์
plt.subplot(2, 4, i+1)
plt.hist(X[:, i], bins=20)
plt.title(feature_names[i])
plt.tight_layout()
plt.show()
# ์๊ด๊ด๊ณ ํ์ธ (์ต๋ 10๊ฐ ํน์ฑ๊น์ง)
if X.shape[1] > 1:
plt.figure(figsize=(10, 8))
corr_matrix = np.corrcoef(X[:, :min(X.shape[1], 10)], rowvar=False)
sns.heatmap(corr_matrix, annot=True, fmt='.2f', cmap='coolwarm',
xticklabels=feature_names[:min(X.shape[1], 10)],
yticklabels=feature_names[:min(X.shape[1], 10)])
plt.title('ํน์ฑ ๊ฐ ์๊ด๊ด๊ณ')
plt.tight_layout()
plt.show()
# ํ๊ฒ ๋ถํฌ ํ์ธ
plt.figure(figsize=(8, 5))
if len(np.unique(y)) <= 10: # ๋ถ๋ฅ ๋ฌธ์
plt.hist(y, bins=len(np.unique(y)))
plt.xticks(np.unique(y))
plt.title('ํด๋์ค ๋ถํฌ')
else: # ํ๊ท ๋ฌธ์
plt.hist(y, bins=30)
plt.title('ํ๊ฒ ๋ณ์ ๋ถํฌ')
plt.xlabel('๊ฐ')
plt.ylabel('๋น๋')
plt.show()
# ์ฌ์ฉ ์์
if __name__ == "__main__":
# ๋ฐ์ดํฐ ์ค๋น ๊ฐ์ฒด ์์ฑ
data_prep = DatasetPreparation(random_state=42)
# ๋ฐ์ดํฐ ๋ก๋
X, y = data_prep.load_dataset('iris')
# ๋ฐ์ดํฐ ๋ถํ
X_train, X_test, y_train, y_test = data_prep.split_data(X, y, test_size=0.2)
# ๋ฐ์ดํฐ ์ค์ผ์ผ๋ง
X_train_scaled, X_test_scaled = data_prep.scale_data('standard')
# ์ ์ฒ๋ฆฌ ํ์ดํ๋ผ์ธ ์์ฑ
pipeline = data_prep.create_pipeline(scaler_type='standard', impute_strategy='mean')
# ๋ฐ์ดํฐ ์๊ฐํ
data_prep.visualize_data(X, y, feature_names=['sepal length', 'sepal width', 'petal length', 'petal width'])
print("๋ฐ์ดํฐ ์ค๋น ์๋ฃ!")
โ
ํน์ง:
- ๋ค์ํ ๋ฐ์ดํฐ์ ๋ก๋ ๋ฐ ์์ฑ ๊ธฐ๋ฅ
- ํ์ต/ํ ์คํธ ๋ฐ์ดํฐ ๋ถํ ์ ํตํ ๋ชจ๋ธ ํ๊ฐ ์ค๋น
- ์ฌ๋ฌ ์ค์ผ์ผ๋ง ๋ฐฉ๋ฒ ์ ๊ณต (ํ์คํ, ์ ๊ทํ, ๋ก๋ฒ์คํธ ์ค์ผ์ผ๋ง)
- ๊ฒฐ์ธก์น ์ฒ๋ฆฌ๋ฅผ ์ํ ๋ค์ํ ์ ๋ต (ํ๊ท , ์ค์๊ฐ, ์ต๋น๊ฐ)
- ๋ฒ์ฃผํ ๋ฐ์ดํฐ ์-ํซ ์ธ์ฝ๋ฉ ์ง์
- ํ์ ํํ ์ ํตํ ์ฝ๋ ๊ฐ๋ ์ฑ ํฅ์
- ํ์ดํ๋ผ์ธ ๊ตฌ์ฑ์ผ๋ก ์ ์ฒ๋ฆฌ ๋จ๊ณ ์๋ํ
- ๋ฐ์ดํฐ ์๊ฐํ ๋๊ตฌ ํตํฉ
- ํด๋์ค ๊ธฐ๋ฐ ์ค๊ณ๋ก ์ฌ์ฌ์ฉ์ฑ ์ฆ๊ฐ
- ์ํ ์ถ์ ๋ฐ ์ผ๊ด๋ ์ ์ฒ๋ฆฌ ๋ณด์ฅ
์ง๋ ํ์ต์ ๋ ์ด๋ธ์ด ์๋ ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํ์ฌ, ์
๋ ฅ์์ ์ถ๋ ฅ์ผ๋ก์ ๋งคํ์ ํ์ตํ๋ ๋จธ์ ๋ฌ๋์ ์ฃผ์ ํจ๋ฌ๋ค์์ด๋ค. Scikit-learn์ ๋ถ๋ฅ, ํ๊ท, ๋ค์ค ์ถ๋ ฅ ๋ฑ ๋ค์ํ ์ง๋ ํ์ต ์๊ณ ๋ฆฌ์ฆ์ ์ ๊ณตํ๋ค.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, learning_curve
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
# ๋ถ๋ฅ ๋ชจ๋ธ
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
# ํ๊ท ๋ชจ๋ธ
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.svm import SVR
from typing import Dict, List, Any, Optional, Tuple, Union, Callable
import time
import joblib
class SupervisedModelTrainer:
"""
๋ค์ํ ์ง๋ ํ์ต ๋ชจ๋ธ์ ํ๋ จํ๊ณ ํ๊ฐํ๋ ํด๋์ค
"""
def __init__(self, random_state: int = 42):
"""
์ด๊ธฐํ
Args:
random_state: ๋๋ค ์๋ ์ค์
"""
self.random_state = random_state
self.models = {}
self.trained_models = {}
self.results = {}
def add_classification_models(self) -> None:
"""
๊ธฐ๋ณธ ๋ถ๋ฅ ๋ชจ๋ธ ์ถ๊ฐ
"""
self.models = {
'logistic_regression': LogisticRegression(random_state=self.random_state, max_iter=1000),
'decision_tree': DecisionTreeClassifier(random_state=self.random_state),
'random_forest': RandomForestClassifier(n_estimators=100, random_state=self.random_state),
'svm': SVC(kernel='rbf', probability=True, random_state=self.random_state),
'knn': KNeighborsClassifier(n_neighbors=5),
'naive_bayes': GaussianNB(),
'gradient_boosting': GradientBoostingClassifier(n_estimators=100, random_state=self.random_state)
}
print(f"{len(self.models)}๊ฐ ๋ถ๋ฅ ๋ชจ๋ธ ์ถ๊ฐ๋จ")
def add_regression_models(self) -> None:
"""
๊ธฐ๋ณธ ํ๊ท ๋ชจ๋ธ ์ถ๊ฐ
"""
self.models = {
'linear_regression': LinearRegression(),
'ridge': Ridge(alpha=1.0, random_state=self.random_state),
'lasso': Lasso(alpha=0.1, random_state=self.random_state),
'elastic_net': ElasticNet(alpha=0.1, l1_ratio=0.5, random_state=self.random_state),
'decision_tree': DecisionTreeRegressor(random_state=self.random_state),
'random_forest': RandomForestRegressor(n_estimators=100, random_state=self.random_state),
'svr': SVR(kernel='rbf'),
'gradient_boosting': GradientBoostingRegressor(n_estimators=100, random_state=self.random_state)
}
print(f"{len(self.models)}๊ฐ ํ๊ท ๋ชจ๋ธ ์ถ๊ฐ๋จ")
def add_custom_model(self, name: str, model: Any) -> None:
"""
์ฌ์ฉ์ ์ ์ ๋ชจ๋ธ ์ถ๊ฐ
Args:
name: ๋ชจ๋ธ ์ด๋ฆ
model: ๋ชจ๋ธ ๊ฐ์ฒด
"""
self.models[name] = model
print(f"์ฌ์ฉ์ ์ ์ ๋ชจ๋ธ '{name}' ์ถ๊ฐ๋จ")
def train_models(self, X_train: np.ndarray, y_train: np.ndarray, verbose: bool = True) -> Dict[str, Any]:
"""
๋ชจ๋ ๋ชจ๋ธ ํ๋ จ
Args:
X_train: ํ๋ จ ํน์ฑ ๋ฐ์ดํฐ
y_train: ํ๋ จ ํ๊ฒ ๋ฐ์ดํฐ
verbose: ์์ธ ์ถ๋ ฅ ์ฌ๋ถ
Returns:
Dict: ํ๋ จ๋ ๋ชจ๋ธ ๋์
๋๋ฆฌ
"""
if not self.models:
raise ValueError("๋จผ์ ๋ชจ๋ธ์ ์ถ๊ฐํด์ผ ํฉ๋๋ค. add_classification_models() ๋๋ add_regression_models()๋ฅผ ํธ์ถํ์ธ์.")
self.trained_models = {}
for name, model in self.models.items():
if verbose:
print(f"'{name}' ๋ชจ๋ธ ํ๋ จ ์ค...")
start_time = time.time()
model.fit(X_train, y_train)
training_time = time.time() - start_time
self.trained_models[name] = {
'model': model,
'training_time': training_time
}
if verbose:
print(f" ํ๋ จ ์๋ฃ: {training_time:.2f}์ด")
print(f"{len(self.trained_models)}๊ฐ ๋ชจ๋ธ ํ๋ จ ์๋ฃ")
return self.trained_models
def evaluate_classification_models(self, X_test: np.ndarray, y_test: np.ndarray, verbose: bool = True) -> Dict[str, Dict[str, Any]]:
"""
๋ถ๋ฅ ๋ชจ๋ธ ํ๊ฐ
Args:
X_test: ํ
์คํธ ํน์ฑ ๋ฐ์ดํฐ
y_test: ํ
์คํธ ํ๊ฒ ๋ฐ์ดํฐ
verbose: ์์ธ ์ถ๋ ฅ ์ฌ๋ถ
Returns:
Dict: ๋ชจ๋ธ๋ณ ํ๊ฐ ๊ฒฐ๊ณผ
"""
if not self.trained_models:
raise ValueError("๋จผ์ ๋ชจ๋ธ์ ํ๋ จํด์ผ ํฉ๋๋ค. train_models()๋ฅผ ํธ์ถํ์ธ์.")
self.results = {}
for name, model_info in self.trained_models.items():
model = model_info['model']
# ์์ธก ์ํ
start_time = time.time()
y_pred = model.predict(X_test)
prediction_time = time.time() - start_time
# ์ฑ๋ฅ ์งํ ๊ณ์ฐ
accuracy = accuracy_score(y_test, y_pred)
report = classification_report(y_test, y_pred, output_dict=True)
# ๊ฒฐ๊ณผ ์ ์ฅ
self.results[name] = {
'accuracy': accuracy,
'precision': report['weighted avg']['precision'],
'recall': report['weighted avg']['recall'],
'f1_score': report['weighted avg']['f1-score'],
'training_time': model_info['training_time'],
'prediction_time': prediction_time,
'full_report': report
}
if verbose:
print(f"\n--- {name} ๋ชจ๋ธ ํ๊ฐ ๊ฒฐ๊ณผ ---")
print(f"์ ํ๋: {accuracy:.4f}")
print(f"์ ๋ฐ๋(๊ฐ์คํ๊ท ): {report['weighted avg']['precision']:.4f}")
print(f"์ฌํ์จ(๊ฐ์คํ๊ท ): {report['weighted avg']['recall']:.4f}")
print(f"F1 ์ ์(๊ฐ์คํ๊ท ): {report['weighted avg']['f1-score']:.4f}")
print(f"ํ๋ จ ์๊ฐ: {model_info['training_time']:.2f}์ด")
print(f"์์ธก ์๊ฐ: {prediction_time:.2f}์ด")
print("\n๋ชจ๋ธ ์ฑ๋ฅ ๋น๊ต (์ ํ๋ ๊ธฐ์ค)")
for name, result in sorted(self.results.items(), key=lambda x: x[1]['accuracy'], reverse=True):
print(f"{name}: {result['accuracy']:.4f}")
return self.results
def evaluate_regression_models(self, X_test: np.ndarray, y_test: np.ndarray, verbose: bool = True) -> Dict[str, Dict[str, Any]]:
"""
ํ๊ท ๋ชจ๋ธ ํ๊ฐ
Args:
X_test: ํ
์คํธ ํน์ฑ ๋ฐ์ดํฐ
y_test: ํ
์คํธ ํ๊ฒ ๋ฐ์ดํฐ
verbose: ์์ธ ์ถ๋ ฅ ์ฌ๋ถ
Returns:
Dict: ๋ชจ๋ธ๋ณ ํ๊ฐ ๊ฒฐ๊ณผ
"""
if not self.trained_models:
raise ValueError("๋จผ์ ๋ชจ๋ธ์ ํ๋ จํด์ผ ํฉ๋๋ค. train_models()๋ฅผ ํธ์ถํ์ธ์.")
self.results = {}
for name, model_info in self.trained_models.items():
model = model_info['model']
# ์์ธก ์ํ
start_time = time.time()
y_pred = model.predict(X_test)
prediction_time = time.time() - start_time
# ์ฑ๋ฅ ์งํ ๊ณ์ฐ
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
# ๊ฒฐ๊ณผ ์ ์ฅ
self.results[name] = {
'mse': mse,
'rmse': rmse,
'mae': mae,
'r2_score': r2,
'training_time': model_info['training_time'],
'prediction_time': prediction_time
}
if verbose:
print(f"\n--- {name} ๋ชจ๋ธ ํ๊ฐ ๊ฒฐ๊ณผ ---")
print(f"MSE: {mse:.4f}")
print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"Rยฒ ์ ์: {r2:.4f}")
print(f"ํ๋ จ ์๊ฐ: {model_info['training_time']:.2f}์ด")
print(f"์์ธก ์๊ฐ: {prediction_time:.2f}์ด")
print("\n๋ชจ๋ธ ์ฑ๋ฅ ๋น๊ต (Rยฒ ์ ์ ๊ธฐ์ค)")
for name, result in sorted(self.results.items(), key=lambda x: x[1]['r2_score'], reverse=True):
print(f"{name}: {result['r2_score']:.4f}")
return self.results
def plot_classification_results(self) -> None:
"""
๋ถ๋ฅ ๋ชจ๋ธ ๊ฒฐ๊ณผ ์๊ฐํ
"""
if not self.results:
raise ValueError("๋จผ์ ๋ชจ๋ธ์ ํ๊ฐํด์ผ ํฉ๋๋ค. evaluate_classification_models()๋ฅผ ํธ์ถํ์ธ์.")
# ์ ํ๋ ๋น๊ต
plt.figure(figsize=(12, 6))
models = list(self.results.keys())
accuracy = [result['accuracy'] for result in self.results.values()]
plt.barh(models, accuracy, color='skyblue')
plt.xlabel('์ ํ๋')
plt.title('๋ชจ๋ธ๋ณ ์ ํ๋ ๋น๊ต')
plt.xlim(0, 1)
for i, v in enumerate(accuracy):
plt.text(v + 0.01, i, f"{v:.4f}", va='center')
plt.tight_layout()
plt.show()
# ํ๋ จ ๋ฐ ์์ธก ์๊ฐ ๋น๊ต
plt.figure(figsize=(12, 6))
training_time = [result['training_time'] for result in self.results.values()]
prediction_time = [result['prediction_time'] for result in self.results.values()]
x = np.arange(len(models))
width = 0.35
plt.barh(x - width/2, training_time, width, label='ํ๋ จ ์๊ฐ', color='lightblue')
plt.barh(x + width/2, prediction_time, width, label='์์ธก ์๊ฐ', color='lightgreen')
plt.yticks(x, models)
plt.xlabel('์๊ฐ (์ด)')
plt.title('๋ชจ๋ธ๋ณ ํ๋ จ ๋ฐ ์์ธก ์๊ฐ ๋น๊ต')
plt.legend()
plt.tight_layout()
plt.show()
def plot_regression_results(self) -> None:
"""
ํ๊ท ๋ชจ๋ธ ๊ฒฐ๊ณผ ์๊ฐํ
"""
if not self.results:
raise ValueError("๋จผ์ ๋ชจ๋ธ์ ํ๊ฐํด์ผ ํฉ๋๋ค. evaluate_regression_models()๋ฅผ ํธ์ถํ์ธ์.")
# Rยฒ ์ ์ ๋น๊ต
plt.figure(figsize=(12, 6))
models = list(self.results.keys())
r2_scores = [result['r2_score'] for result in self.results.values()]
plt.barh(models, r2_scores, color='skyblue')
plt.xlabel('Rยฒ ์ ์')
plt.title('๋ชจ๋ธ๋ณ Rยฒ ์ ์ ๋น๊ต')
plt.xlim(0, 1)
for i, v in enumerate(r2_scores):
plt.text(v + 0.01, i, f"{v:.4f}", va='center')
plt.tight_layout()
plt.show()
# RMSE ๋น๊ต
plt.figure(figsize=(12, 6))
rmse_values = [result['rmse'] for result in self.results.values()]
plt.barh(models, rmse_values, color='salmon')
plt.xlabel('RMSE')
plt.title('๋ชจ๋ธ๋ณ RMSE ๋น๊ต')
for i, v in enumerate(rmse_values):
plt.text(v + 0.01, i, f"{v:.4f}", va='center')
plt.tight_layout()
plt.show()
def plot_learning_curve(self, model_name: str, X: np.ndarray, y: np.ndarray, cv: int = 5) -> None:
"""
ํ์ต ๊ณก์ ์๊ฐํ
Args:
model_name: ๋ชจ๋ธ ์ด๋ฆ
X: ์ ์ฒด ํน์ฑ ๋ฐ์ดํฐ
y: ์ ์ฒด ํ๊ฒ ๋ฐ์ดํฐ
cv: ๊ต์ฐจ ๊ฒ์ฆ ํด๋ ์
"""
if model_name not in self.trained_models:
raise ValueError(f"'{model_name}' ๋ชจ๋ธ์ด ํ๋ จ๋์ง ์์์ต๋๋ค.")
model = self.trained_models[model_name]['model']
plt.figure(figsize=(10, 6))
train_sizes, train_scores, test_scores = learning_curve(
model, X, y, cv=cv, n_jobs=-1,
train_sizes=np.linspace(0.1, 1.0, 10),
scoring='accuracy' if hasattr(model, 'predict_proba') else 'r2'
)
train_mean = np.mean(train_scores, axis=1)
train_std = np.std(train_scores, axis=1)
test_mean = np.mean(test_scores, axis=1)
test_std = np.std(test_scores, axis=1)
plt.plot(train_sizes, train_mean, 'o-', color='blue', label='ํ๋ จ ์ ์')
plt.fill_between(train_sizes, train_mean - train_std, train_mean + train_std, alpha=0.1, color='blue')
plt.plot(train_sizes, test_mean, 'o-', color='green', label='๊ต์ฐจ ๊ฒ์ฆ ์ ์')
plt.fill_between(train_sizes, test_mean - test_std, test_mean + test_std, alpha=0.1, color='green')
plt.xlabel('ํ๋ จ ์ํ ์')
plt.ylabel('์ ์')
plt.title(f'{model_name} ๋ชจ๋ธ์ ํ์ต ๊ณก์ ')
plt.legend(loc='best')
plt.grid(True)
plt.tight_layout()
plt.show()
def save_model(self, model_name: str, filename: str) -> None:
"""
๋ชจ๋ธ ์ ์ฅ
Args:
model_name: ์ ์ฅํ ๋ชจ๋ธ ์ด๋ฆ
filename: ์ ์ฅํ ํ์ผ ๊ฒฝ๋ก
"""
if model_name not in self.trained_models:
raise ValueError(f"'{model_name}' ๋ชจ๋ธ์ด ํ๋ จ๋์ง ์์์ต๋๋ค.")
model = self.trained_models[model_name]['model']
joblib.dump(model, filename)
print(f"'{model_name}' ๋ชจ๋ธ์ด '{filename}'์ ์ ์ฅ๋์์ต๋๋ค.")
def load_model(self, model_name: str, filename: str) -> Any:
"""
๋ชจ๋ธ ๋ก๋
Args:
model_name: ๋ก๋ํ ๋ชจ๋ธ ์ด๋ฆ
filename: ๋ก๋ํ ํ์ผ ๊ฒฝ๋ก
Returns:
Any: ๋ก๋๋ ๋ชจ๋ธ
"""
model = joblib.load(filename)
self.trained_models[model_name] = {'model': model, 'training_time': 0}
print(f"'{filename}'์์ '{model_name}' ๋ชจ๋ธ์ ๋ก๋ํ์ต๋๋ค.")
return model
def get_best_model(self, metric: str = 'accuracy') -> Tuple[str, Any]:
"""
์ต๊ณ ์ฑ๋ฅ ๋ชจ๋ธ ๋ฐํ
Args:
metric: ํ๊ฐ ์งํ ('accuracy', 'f1_score', 'r2_score', 'rmse' ๋ฑ)
Returns:
Tuple: (๋ชจ๋ธ ์ด๋ฆ, ๋ชจ๋ธ ๊ฐ์ฒด)
"""
if not self.results:
raise ValueError("๋จผ์ ๋ชจ๋ธ์ ํ๊ฐํด์ผ ํฉ๋๋ค.")
# ์งํ๊ฐ ๋์์๋ก ์ข์ ๊ฒฝ์ฐ (accuracy, f1, r2 ๋ฑ)
if metric in ['accuracy', 'precision', 'recall', 'f1_score', 'r2_score']:
best_model_name = max(self.results, key=lambda x: self.results[x][metric])
# ์งํ๊ฐ ๋ฎ์์๋ก ์ข์ ๊ฒฝ์ฐ (mse, rmse, mae ๋ฑ)
elif metric in ['mse', 'rmse', 'mae']:
best_model_name = min(self.results, key=lambda x: self.results[x][metric])
else:
raise ValueError(f"์ง์๋์ง ์๋ ํ๊ฐ ์งํ: {metric}")
best_model = self.trained_models[best_model_name]['model']
best_score = self.results[best_model_name][metric]
print(f"์ต๊ณ ์ฑ๋ฅ ๋ชจ๋ธ: {best_model_name} ({metric}: {best_score:.4f})")
return best_model_name, best_model
# ์ฌ์ฉ ์์
if __name__ == "__main__":
# ๋ฐ์ดํฐ ์ค๋น
from sklearn.datasets import load_breast_cancer
data = load_breast_cancer()
X, y = data.data, data.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# ํน์ฑ ์ค์ผ์ผ๋ง
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# ๋ชจ๋ธ ํ๋ จ ๋ฐ ํ๊ฐ
trainer = SupervisedModelTrainer(random_state=42)
trainer.add_classification_models()
trainer.train_models(X_train_scaled, y_train)
results = trainer.evaluate_classification_models(X_test_scaled, y_test)
# ๊ฒฐ๊ณผ ์๊ฐํ
trainer.plot_classification_results()
# ์ต๊ณ ์ฑ๋ฅ ๋ชจ๋ธ ํ์ธ
best_model_name, best_model = trainer.get_best_model('accuracy')
# ํ์ต ๊ณก์ ํ์ธ
trainer.plot_learning_curve(best_model_name, X, y)
# ๋ชจ๋ธ ์ ์ฅ
trainer.save_model(best_model_name, f'best_model_{best_model_name}.joblib')
โ
ํน์ง:
- ๋ถ๋ฅ์ ํ๊ท ๋ชจ๋ธ์ ์ํ ํตํฉ ์ธํฐํ์ด์ค ์ ๊ณต
- ๋ค์ํ ์๊ณ ๋ฆฌ์ฆ ์ง์ (๋ก์ง์คํฑ ํ๊ท, ๊ฒฐ์ ํธ๋ฆฌ, ๋๋ค ํฌ๋ ์คํธ, SVM, KNN ๋ฑ)
- ๋ชจ๋ธ ํ๋ จ ๋ฐ ํ๊ฐ ์๋ํ
- ๋ค์ํ ์ฑ๋ฅ ์งํ ๊ณ์ฐ (์ ํ๋, ์ ๋ฐ๋, ์ฌํ์จ, F1 ์ ์, MSE, RMSE, Rยฒ ๋ฑ)
- ๋ชจ๋ธ ์ฑ๋ฅ ์๊ฐํ ๋ฐ ๋น๊ต
- ํ์ต ๊ณก์ ์ ํตํ ๊ณผ์ ํฉ/๊ณผ์์ ํฉ ์ง๋จ
- ์ต๊ณ ์ฑ๋ฅ ๋ชจ๋ธ ์ ์ ๋ฐ ์ ์ฅ ๊ธฐ๋ฅ
- ํ๋ จ ๋ฐ ์์ธก ์๊ฐ ์ธก์ ์ผ๋ก ํจ์จ์ฑ ํ๊ฐ
- ํ์ ํํ ์ ํตํ ์ฝ๋ ๊ฐ๋ ์ฑ ํฅ์
- ํ์ฅ์ฑ ์๋ ์ค๊ณ๋ก ์ฌ์ฉ์ ์ ์ ๋ชจ๋ธ ์ง์
๋น์ง๋ ํ์ต์ ๋ ์ด๋ธ์ด ์๋ ๋ฐ์ดํฐ์์ ํจํด์ ์ฐพ์๋ด๋ ๋จธ์ ๋ฌ๋์ ํ ๋ถ์ผ๋ก, Scikit-learn์ ํด๋ฌ์คํฐ๋ง, ์ฐจ์ ์ถ์, ์ด์์น ํ์ง ๋ฑ ๋ค์ํ ๋น์ง๋ ํ์ต ์๊ณ ๋ฆฌ์ฆ์ ์ ๊ณตํ๋ค.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.decomposition import PCA, TruncatedSVD, NMF
from sklearn.manifold import TSNE, Isomap
from sklearn.metrics import silhouette_score, calinski_harabasz_score
from sklearn.neighbors import NearestNeighbors
from sklearn.ensemble import IsolationForest
from sklearn.mixture import GaussianMixture
from typing import Dict, List, Any, Optional, Tuple, Union
import time
import matplotlib.cm as cm
class UnsupervisedLearning:
"""
๋น์ง๋ ํ์ต ์๊ณ ๋ฆฌ์ฆ ์ ์ฉ ๋ฐ ์๊ฐํ ํด๋์ค
"""
def __init__(self, random_state: int = 42):
"""
์ด๊ธฐํ
Args:
random_state: ๋๋ค ์๋
"""
self.random_state = random_state
self.data = None
self.scaled_data = None
self.cluster_labels = {}
self.reduced_data = {}
self.scaler = None
def load_data(self, X: np.ndarray) -> np.ndarray:
"""
๋ฐ์ดํฐ ๋ก๋ ๋ฐ ์ ์ฅ
Args:
X: ํน์ฑ ๋ฐ์ดํฐ
Returns:
np.ndarray: ์ ์ฅ๋ ๋ฐ์ดํฐ
"""
self.data = X
print(f"๋ฐ์ดํฐ ๋ก๋: {X.shape[0]} ์ํ, {X.shape[1]} ํน์ฑ")
return self.data
def scale_data(self, method: str = 'standard') -> np.ndarray:
"""
๋ฐ์ดํฐ ์ค์ผ์ผ๋ง
Args:
method: ์ค์ผ์ผ๋ง ๋ฐฉ๋ฒ ('standard', 'minmax')
Returns:
np.ndarray: ์ค์ผ์ผ๋ง๋ ๋ฐ์ดํฐ
"""
if self.data is None:
raise ValueError("๋จผ์ load_data๋ฅผ ํธ์ถํ์ฌ ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํด์ผ ํฉ๋๋ค.")
if method == 'standard':
self.scaler = StandardScaler()
print("StandardScaler ์ ์ฉ: ํ๊ท =0, ํ์คํธ์ฐจ=1")
elif method == 'minmax':
self.scaler = MinMaxScaler()
print("MinMaxScaler ์ ์ฉ: ๋ฒ์=[0,1]")
else:
raise ValueError("์ง์๋์ง ์๋ ์ค์ผ์ผ๋ง ๋ฐฉ๋ฒ์
๋๋ค.")
self.scaled_data = self.scaler.fit_transform(self.data)
return self.scaled_data
def find_optimal_clusters(self, max_clusters: int = 10, method: str = 'elbow') -> int:
"""
์ต์ ์ ํด๋ฌ์คํฐ ์ ์ฐพ๊ธฐ
Args:
max_clusters: ํ์ํ ์ต๋ ํด๋ฌ์คํฐ ์
method: ๋ฐฉ๋ฒ ('elbow', 'silhouette')
Returns:
int: ์ต์ ์ ํด๋ฌ์คํฐ ์
"""
if self.scaled_data is None:
raise ValueError("๋จผ์ scale_data๋ฅผ ํธ์ถํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ค์ผ์ผ๋งํด์ผ ํฉ๋๋ค.")
data = self.scaled_data
if method == 'elbow':
# Elbow ๋ฐฉ๋ฒ
inertia = []
for k in range(1, max_clusters + 1):
kmeans = KMeans(n_clusters=k, random_state=self.random_state)
kmeans.fit(data)
inertia.append(kmeans.inertia_)
# ๊ฒฐ๊ณผ ์๊ฐํ
plt.figure(figsize=(10, 6))
plt.plot(range(1, max_clusters + 1), inertia, marker='o')
plt.title('Elbow Method for Optimal k')
plt.xlabel('ํด๋ฌ์คํฐ ์')
plt.ylabel('๊ด์ฑ (Inertia)')
plt.xticks(range(1, max_clusters + 1))
plt.grid(True)
plt.show()
# ์ต์ ์ k ์ถ์ (๊ธฐ์ธ๊ธฐ ๋ณํ๊ฐ ๊ฐ์ฅ ํฐ ์ง์ )
k_diff = np.diff(inertia)
k_diff2 = np.diff(k_diff)
optimal_k = np.argmax(np.abs(k_diff2)) + 2 # +2: diff ์ฐ์ฐ์ผ๋ก ์ธํ ์ธ๋ฑ์ค ์กฐ์
print(f"Elbow ๋ฐฉ๋ฒ์ผ๋ก ์ถ์ ๋ ์ต์ ์ ํด๋ฌ์คํฐ ์: {optimal_k}")
elif method == 'silhouette':
# ์ค๋ฃจ์ฃ ๋ฐฉ๋ฒ
silhouette_scores = []
for k in range(2, max_clusters + 1): # ์ค๋ฃจ์ฃ ์ ์๋ k >= 2 ํ์
kmeans = KMeans(n_clusters=k, random_state=self.random_state)
labels = kmeans.fit_predict(data)
score = silhouette_score(data, labels)
silhouette_scores.append(score)
# ๊ฒฐ๊ณผ ์๊ฐํ
plt.figure(figsize=(10, 6))
plt.plot(range(2, max_clusters + 1), silhouette_scores, marker='o')
plt.title('Silhouette Method for Optimal k')
plt.xlabel('ํด๋ฌ์คํฐ ์')
plt.ylabel('์ค๋ฃจ์ฃ ์ ์ (Silhouette Score)')
plt.xticks(range(2, max_clusters + 1))
plt.grid(True)
plt.show()
# ์ต์ ์ k ์ถ์ (์ค๋ฃจ์ฃ ์ ์๊ฐ ๊ฐ์ฅ ๋์ ์ง์ )
optimal_k = np.argmax(silhouette_scores) + 2 # +2: k=2๋ถํฐ ์์ํ๋ฏ๋ก
print(f"์ค๋ฃจ์ฃ ๋ฐฉ๋ฒ์ผ๋ก ์ถ์ ๋ ์ต์ ์ ํด๋ฌ์คํฐ ์: {optimal_k}")
else:
raise ValueError("์ง์๋์ง ์๋ ๋ฐฉ๋ฒ์
๋๋ค.")
return optimal_k
def perform_clustering(self, algorithm: str = 'kmeans', params: Optional[Dict[str, Any]] = None) -> np.ndarray:
"""
ํด๋ฌ์คํฐ๋ง ์ํ
Args:
algorithm: ํด๋ฌ์คํฐ๋ง ์๊ณ ๋ฆฌ์ฆ ('kmeans', 'dbscan', 'hierarchical', 'gmm')
params: ์๊ณ ๋ฆฌ์ฆ ํ๋ผ๋ฏธํฐ
Returns:
np.ndarray: ํด๋ฌ์คํฐ ๋ ์ด๋ธ
"""
if self.scaled_data is None:
raise ValueError("๋จผ์ scale_data๋ฅผ ํธ์ถํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ค์ผ์ผ๋งํด์ผ ํฉ๋๋ค.")
data = self.scaled_data
if params is None:
params = {}
if algorithm == 'kmeans':
# KMeans ํด๋ฌ์คํฐ๋ง
n_clusters = params.get('n_clusters', 3)
model = KMeans(
n_clusters=n_clusters,
random_state=self.random_state,
n_init=params.get('n_init', 10)
)
elif algorithm == 'dbscan':
# DBSCAN ํด๋ฌ์คํฐ๋ง
eps = params.get('eps', 0.5)
min_samples = params.get('min_samples', 5)
model = DBSCAN(
eps=eps,
min_samples=min_samples
)
elif algorithm == 'hierarchical':
# ๊ณ์ธต์ ํด๋ฌ์คํฐ๋ง
n_clusters = params.get('n_clusters', 3)
linkage = params.get('linkage', 'ward')
model = AgglomerativeClustering(
n_clusters=n_clusters,
linkage=linkage
)
elif algorithm == 'gmm':
# ๊ฐ์ฐ์์ ํผํฉ ๋ชจ๋ธ
n_components = params.get('n_components', 3)
model = GaussianMixture(
n_components=n_components,
random_state=self.random_state
)
else:
raise ValueError("์ง์๋์ง ์๋ ์๊ณ ๋ฆฌ์ฆ์
๋๋ค.")
# ํด๋ฌ์คํฐ๋ง ์ํ
start_time = time.time()
labels = model.fit_predict(data)
duration = time.time() - start_time
# ํด๋ฌ์คํฐ ํต๊ณ
unique_labels = np.unique(labels)
n_clusters = len(unique_labels)
n_noise = 0
if algorithm == 'dbscan':
n_noise = np.sum(labels == -1)
print(f"DBSCAN ํด๋ฌ์คํฐ๋ง ๊ฒฐ๊ณผ: {n_clusters} ํด๋ฌ์คํฐ, {n_noise} ๋
ธ์ด์ฆ ํฌ์ธํธ")
else:
print(f"{algorithm.upper()} ํด๋ฌ์คํฐ๋ง ๊ฒฐ๊ณผ: {n_clusters} ํด๋ฌ์คํฐ")
# ํด๋ฌ์คํฐ๋ณ ์ํ ์
for label in unique_labels:
if label == -1 and algorithm == 'dbscan':
continue
count = np.sum(labels == label)
print(f" ํด๋ฌ์คํฐ {label}: {count} ์ํ ({count/len(labels)*100:.1f}%)")
# ํด๋ฌ์คํฐ๋ง ํ๊ฐ (์ค๋ฃจ์ฃ ์ ์)
if n_clusters > 1 and (algorithm != 'dbscan' or n_noise < len(labels)):
try:
if algorithm == 'dbscan' and n_noise > 0:
# ๋
ธ์ด์ฆ ํฌ์ธํธ ์ ์ธ ํ๊ฐ
non_noise_mask = (labels != -1)
silhouette = silhouette_score(data[non_noise_mask], labels[non_noise_mask])
calinski = calinski_harabasz_score(data[non_noise_mask], labels[non_noise_mask])
else:
silhouette = silhouette_score(data, labels)
calinski = calinski_harabasz_score(data, labels)
print(f"์ค๋ฃจ์ฃ ์ ์: {silhouette:.3f} (๋์์๋ก ์ข์, ๋ฒ์: [-1, 1])")
print(f"Calinski-Harabasz ์ ์: {calinski:.3f} (๋์์๋ก ์ข์)")
except Exception as e:
print(f"ํด๋ฌ์คํฐ๋ง ํ๊ฐ ์ค๋ฅ: {e}")
print(f"์์ ์๊ฐ: {duration:.3f}์ด")
# ๊ฒฐ๊ณผ ์ ์ฅ
self.cluster_labels[algorithm] = labels
return labels
def perform_dimension_reduction(self, algorithm: str = 'pca', n_components: int = 2) -> np.ndarray:
"""
์ฐจ์ ์ถ์ ์ํ
Args:
algorithm: ์ฐจ์ ์ถ์ ์๊ณ ๋ฆฌ์ฆ ('pca', 'tsne', 'svd', 'nmf', 'isomap')
n_components: ์ถ์ํ ์ฐจ์ ์
Returns:
np.ndarray: ์ถ์๋ ๋ฐ์ดํฐ
"""
if self.scaled_data is None:
raise ValueError("๋จผ์ scale_data๋ฅผ ํธ์ถํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ค์ผ์ผ๋งํด์ผ ํฉ๋๋ค.")
data = self.scaled_data
if algorithm == 'pca':
# PCA ์ฐจ์ ์ถ์
model = PCA(n_components=n_components, random_state=self.random_state)
elif algorithm == 'tsne':
# t-SNE ์ฐจ์ ์ถ์
model = TSNE(
n_components=n_components,
random_state=self.random_state,
perplexity=min(30, data.shape[0] - 1)
)
elif algorithm == 'svd':
# ์ ๋จ๋ SVD
model = TruncatedSVD(n_components=n_components, random_state=self.random_state)
elif algorithm == 'nmf':
# ๋น์์ ํ๋ ฌ ๋ถํด
model = NMF(n_components=n_components, random_state=self.random_state)
elif algorithm == 'isomap':
# Isomap
model = Isomap(n_components=n_components)
else:
raise ValueError("์ง์๋์ง ์๋ ์๊ณ ๋ฆฌ์ฆ์
๋๋ค.")
# ์ฐจ์ ์ถ์ ์ํ
start_time = time.time()
reduced_data = model.fit_transform(data)
duration = time.time() - start_time
print(f"{algorithm.upper()} ์ฐจ์ ์ถ์ ๊ฒฐ๊ณผ: {data.shape} โ {reduced_data.shape}")
print(f"์์ ์๊ฐ: {duration:.3f}์ด")
# PCA์ ๊ฒฝ์ฐ ์ค๋ช
๋ ๋ถ์ฐ ๋น์จ ์ถ๋ ฅ
if algorithm == 'pca':
explained_variance = model.explained_variance_ratio_
cumulative_variance = np.cumsum(explained_variance)
print(f"์ค๋ช
๋ ๋ถ์ฐ ๋น์จ: {explained_variance}")
print(f"๋์ ์ค๋ช
๋ ๋ถ์ฐ ๋น์จ: {cumulative_variance[-1]:.3f}")
# ์ค๋ช
๋ ๋ถ์ฐ ๋น์จ ์๊ฐํ
plt.figure(figsize=(10, 6))
plt.bar(range(1, len(explained_variance) + 1), explained_variance, alpha=0.7)
plt.step(range(1, len(cumulative_variance) + 1), cumulative_variance, where='mid', color='red')
plt.ylabel('์ค๋ช
๋ ๋ถ์ฐ ๋น์จ')
plt.xlabel('์ฃผ์ฑ๋ถ')
plt.title('PCA: ์ค๋ช
๋ ๋ถ์ฐ ๋น์จ')
plt.show()
# ๊ฒฐ๊ณผ ์ ์ฅ
self.reduced_data[algorithm] = reduced_data
return reduced_data
def visualize_clusters(self, algorithm: str = 'kmeans', reduction_method: str = 'pca') -> None:
"""
ํด๋ฌ์คํฐ ์๊ฐํ
Args:
algorithm: ํด๋ฌ์คํฐ๋ง ์๊ณ ๋ฆฌ์ฆ ์ด๋ฆ
reduction_method: ์๊ฐํ๋ฅผ ์ํ ์ฐจ์ ์ถ์ ๋ฐฉ๋ฒ
"""
if algorithm not in self.cluster_labels:
raise ValueError(f"'{algorithm}' ํด๋ฌ์คํฐ๋ง์ด ์ํ๋์ง ์์์ต๋๋ค.")
# ์ฐจ์ ์ถ์๋ ๋ฐ์ดํฐ๊ฐ ์๋ ๊ฒฝ์ฐ ์ํ
if reduction_method not in self.reduced_data:
self.perform_dimension_reduction(algorithm=reduction_method, n_components=2)
# ๋ฐ์ดํฐ์ ๋ ์ด๋ธ ์ค๋น
reduced_data = self.reduced_data[reduction_method]
labels = self.cluster_labels[algorithm]
# ์์ ๋งต ์ค์
unique_labels = np.unique(labels)
n_clusters = len(unique_labels)
colors = cm.tab10(np.linspace(0, 1, max(10, n_clusters)))
# 2D ์๊ฐํ
plt.figure(figsize=(12, 10))
for i, label in enumerate(unique_labels):
if label == -1: # ๋
ธ์ด์ฆ ํฌ์ธํธ (DBSCAN)
color = 'black'
marker = 'x'
label_name = 'Noise'
else:
color = colors[i % len(colors)]
marker = 'o'
label_name = f'Cluster {label}'
mask = (labels == label)
plt.scatter(
reduced_data[mask, 0],
reduced_data[mask, 1],
c=[color],
marker=marker,
label=label_name,
alpha=0.7,
s=70
)
plt.title(f'{algorithm.upper()} Clustering with {reduction_method.upper()} Visualization')
plt.xlabel(f'{reduction_method.upper()} Component 1')
plt.ylabel(f'{reduction_method.upper()} Component 2')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()
# 3D ์๊ฐํ (3์ฐจ์์ผ๋ก ์ถ์๋ ๊ฒฝ์ฐ)
if reduction_method in self.reduced_data and self.reduced_data[reduction_method].shape[1] >= 3:
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure(figsize=(12, 10))
ax = fig.add_subplot(111, projection='3d')
for i, label in enumerate(unique_labels):
if label == -1: # ๋
ธ์ด์ฆ ํฌ์ธํธ (DBSCAN)
color = 'black'
marker = 'x'
label_name = 'Noise'
else:
color = colors[i % len(colors)]
marker = 'o'
label_name = f'Cluster {label}'
mask = (labels == label)
ax.scatter(
reduced_data[mask, 0],
reduced_data[mask, 1],
reduced_data[mask, 2],
c=[color],
marker=marker,
label=label_name,
alpha=0.7,
s=70
)
ax.set_title(f'{algorithm.upper()} Clustering with {reduction_method.upper()} 3D Visualization')
ax.set_xlabel(f'{reduction_method.upper()} Component 1')
ax.set_ylabel(f'{reduction_method.upper()} Component 2')
ax.set_zlabel(f'{reduction_method.upper()} Component 3')
ax.legend()
plt.tight_layout()
plt.show()
def detect_anomalies(self, method: str = 'isolation_forest', contamination: float = 0.05) -> np.ndarray:
"""
์ด์์น ํ์ง
Args:
method: ์ด์์น ํ์ง ๋ฐฉ๋ฒ ('isolation_forest', 'lof')
contamination: ์ด์์น ๋น์จ ์ถ์ ์น
Returns:
np.ndarray: ์ด์์น ๋ ์ด๋ธ (1: ์ ์, -1: ์ด์์น)
"""
if self.scaled_data is None:
raise ValueError("๋จผ์ scale_data๋ฅผ ํธ์ถํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ค์ผ์ผ๋งํด์ผ ํฉ๋๋ค.")
data = self.scaled_data
if method == 'isolation_forest':
# ์์ด์๋ ์ด์
ํฌ๋ ์คํธ
model = IsolationForest(
contamination=contamination,
random_state=self.random_state
)
elif method == 'lof':
# Local Outlier Factor
from sklearn.neighbors import LocalOutlierFactor
model = LocalOutlierFactor(
n_neighbors=20,
contamination=contamination
)
else:
raise ValueError("์ง์๋์ง ์๋ ์ด์์น ํ์ง ๋ฐฉ๋ฒ์
๋๋ค.")
# ์ด์์น ํ์ง ์ํ
start_time = time.time()
if method == 'lof':
# LOF๋ fit_predict๋ฅผ ํ ๋ฒ์ ํธ์ถํด์ผ ํจ
labels = model.fit_predict(data)
else:
model.fit(data)
labels = model.predict(data)
duration = time.time() - start_time
# ์ด์์น ํต๊ณ
n_samples = data.shape[0]
n_outliers = np.sum(labels == -1)
outlier_ratio = n_outliers / n_samples
print(f"{method.upper()} ์ด์์น ํ์ง ๊ฒฐ๊ณผ:")
print(f" ์ ์ฒด ์ํ: {n_samples}")
print(f" ์ด์์น: {n_outliers} ({outlier_ratio:.1%})")
print(f" ์ ์ ๋ฐ์ดํฐ: {n_samples - n_outliers} ({1 - outlier_ratio:.1%})")
print(f"์์ ์๊ฐ: {duration:.3f}์ด")
# ๊ฒฐ๊ณผ ์ ์ฅ
self.cluster_labels[method] = labels
return labels
def visualize_anomalies(self, method: str = 'isolation_forest', reduction_method: str = 'pca') -> None:
"""
์ด์์น ์๊ฐํ
Args:
method: ์ด์์น ํ์ง ๋ฐฉ๋ฒ
reduction_method: ์๊ฐํ๋ฅผ ์ํ ์ฐจ์ ์ถ์ ๋ฐฉ๋ฒ
"""
if method not in self.cluster_labels:
raise ValueError(f"'{method}' ์ด์์น ํ์ง๊ฐ ์ํ๋์ง ์์์ต๋๋ค.")
# ์ฐจ์ ์ถ์๋ ๋ฐ์ดํฐ๊ฐ ์๋ ๊ฒฝ์ฐ ์ํ
if reduction_method not in self.reduced_data:
self.perform_dimension_reduction(algorithm=reduction_method, n_components=2)
# ๋ฐ์ดํฐ์ ๋ ์ด๋ธ ์ค๋น
reduced_data = self.reduced_data[reduction_method]
labels = self.cluster_labels[method]
# 2D ์๊ฐํ
plt.figure(figsize=(12, 10))
# ์ ์ ๋ฐ์ดํฐ
normal_mask = (labels == 1)
plt.scatter(
reduced_data[normal_mask, 0],
reduced_data[normal_mask, 1],
c='blue',
marker='o',
label='Normal',
alpha=0.5
)
# ์ด์์น
outlier_mask = (labels == -1)
plt.scatter(
reduced_data[outlier_mask, 0],
reduced_data[outlier_mask, 1],
c='red',
marker='x',
label='Anomaly',
alpha=0.7,
s=100
)
plt.title(f'{method.upper()} Anomaly Detection with {reduction_method.upper()} Visualization')
plt.xlabel(f'{reduction_method.upper()} Component 1')
plt.ylabel(f'{reduction_method.upper()} Component 2')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()
# ์ฌ์ฉ ์์
if __name__ == "__main__":
# ๋ฐ์ดํฐ ์ค๋น
from sklearn.datasets import make_blobs
# ์ํ ๋ฐ์ดํฐ ์์ฑ
X, y = make_blobs(
n_samples=1000,
n_features=10,
centers=5,
cluster_std=1.0,
random_state=42
)
# ๋น์ง๋ ํ์ต ๊ฐ์ฒด ์์ฑ
unsupervised = UnsupervisedLearning(random_state=42)
# ๋ฐ์ดํฐ ๋ก๋ ๋ฐ ์ค์ผ์ผ๋ง
unsupervised.load_data(X)
unsupervised.scale_data()
# ์ต์ ์ ํด๋ฌ์คํฐ ์ ์ฐพ๊ธฐ
optimal_k = unsupervised.find_optimal_clusters(max_clusters=10, method='silhouette')
# ํด๋ฌ์คํฐ๋ง ์ํ
labels = unsupervised.perform_clustering(
algorithm='kmeans',
params={'n_clusters': optimal_k}
)
# ์ฐจ์ ์ถ์ ์ํ
reduced_data = unsupervised.perform_dimension_reduction(algorithm='pca', n_components=2)
# ํด๋ฌ์คํฐ ์๊ฐํ
unsupervised.visualize_clusters(algorithm='kmeans', reduction_method='pca')
# ์ด์์น ํ์ง
anomaly_labels = unsupervised.detect_anomalies(method='isolation_forest', contamination=0.05)
# ์ด์์น ์๊ฐํ
unsupervised.visualize_anomalies(method='isolation_forest', reduction_method='pca')
โ
ํน์ง:
- ๋ค์ํ ํด๋ฌ์คํฐ๋ง ์๊ณ ๋ฆฌ์ฆ ์ง์ (K-means, DBSCAN, ๊ณ์ธต์ ํด๋ฌ์คํฐ๋ง, GMM)
- ์ฌ๋ฌ ์ฐจ์ ์ถ์ ๊ธฐ๋ฒ ์ ๊ณต (PCA, t-SNE, SVD, NMF, Isomap)
- ์ด์์น ํ์ง ์๊ณ ๋ฆฌ์ฆ (Isolation Forest, LOF)
- ์ต์ ์ ํด๋ฌ์คํฐ ์๋ฅผ ์ฐพ๊ธฐ ์ํ ๋ฐฉ๋ฒ๋ค (Elbow, Silhouette)
- ํด๋ฌ์คํฐ๋ง ํ๊ฐ ์งํ (์ค๋ฃจ์ฃ ์ ์, Calinski-Harabasz ์ ์)
- 2D ๋ฐ 3D ์๊ฐํ ๊ธฐ๋ฅ
- ์๊ณ ๋ฆฌ์ฆ ์ฑ๋ฅ ๋ฐ ์์ ์๊ฐ ์ธก์
- ๋ชจ๋ํ๋ ์ฝ๋ ๊ตฌ์กฐ๋ก ์ฌ์ด ํ์ฅ ๋ฐ ์ฌ์ฌ์ฉ
- ํ์ ํํ ์ ํตํ ์ฝ๋ ๊ฐ๋ ์ฑ ํฅ์
- ํด๋ฌ์คํฐ์ ์ด์์น์ ๋ช ํํ ์๊ฐํ ์ ๊ณต
from sklearn.model_selection import cross_val_score
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import GridSearchCV
# ๊ต์ฐจ ๊ฒ์ฆ
cv_scores = cross_val_score(rf_model, X_train_scaled, y_train, cv=5)
print(f"๊ต์ฐจ ๊ฒ์ฆ ์ ์: {cv_scores.mean():.3f} (+/- {cv_scores.std() * 2:.3f})")
# ๊ทธ๋ฆฌ๋ ์์น
param_grid = {
'n_estimators': [100, 200, 300],
'max_depth': [10, 20, 30, None]
}
grid_search = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid,
cv=5
)
grid_search.fit(X_train_scaled, y_train)
print(f"์ต์ ํ๋ผ๋ฏธํฐ: {grid_search.best_params_}")
โ
ํน์ง:
- ๊ต์ฐจ ๊ฒ์ฆ
- ํ์ดํผํ๋ผ๋ฏธํฐ ํ๋
- ์ฑ๋ฅ ํ๊ฐ
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
# ์ ์ฒ๋ฆฌ์ ๋ชจ๋ธ๋ง ํ์ดํ๋ผ์ธ
pipeline = Pipeline([
('imputer', SimpleImputer(strategy='mean')),
('scaler', StandardScaler()),
('classifier', RandomForestClassifier())
])
# ํ์ดํ๋ผ์ธ ์คํ
pipeline.fit(X_train, y_train)
pipeline_pred = pipeline.predict(X_test)
# ํ์ดํ๋ผ์ธ ํ๊ฐ
print(classification_report(y_test, pipeline_pred))
โ
ํน์ง:
- ์ ์ฒ๋ฆฌ ์๋ํ
- ๋ชจ๋ธ ์ฐ๊ฒฐ
- ํ๊ฐ ๋ณด๊ณ ์
โ ๋ชจ๋ฒ ์ฌ๋ก:
- ๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ ์ค์์ฑ
- ๊ต์ฐจ ๊ฒ์ฆ ํ์ฉ
- ํ์ดํผํ๋ผ๋ฏธํฐ ํ๋
- ํ์ดํ๋ผ์ธ ๊ตฌ์ถ
- ๋ชจ๋ธ ํ๊ฐ ์งํ ์ ํ
- ๊ณผ์ ํฉ ๋ฐฉ์ง
- ํน์ฑ ์ ํ๊ณผ ์์ง๋์ด๋ง
- ๋ถ๊ท ํ ๋ฐ์ดํฐ ์ฒ๋ฆฌ
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
class TextClassifier:
def __init__(self):
self.pipeline = Pipeline([
('tfidf', TfidfVectorizer()),
('classifier', MultinomialNB())
])
def train(self, texts, labels):
self.pipeline.fit(texts, labels)
def predict(self, texts):
return self.pipeline.predict(texts)
def evaluate(self, texts, true_labels):
pred_labels = self.predict(texts)
return classification_report(true_labels, pred_labels)
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
class AnomalyDetector:
def __init__(self, contamination=0.1):
self.scaler = StandardScaler()
self.detector = IsolationForest(
contamination=contamination,
random_state=42
)
def fit(self, data):
scaled_data = self.scaler.fit_transform(data)
self.detector.fit(scaled_data)
def predict(self, data):
scaled_data = self.scaler.transform(data)
predictions = self.detector.predict(scaled_data)
return predictions == -1 # True for anomalies
def get_anomaly_scores(self, data):
scaled_data = self.scaler.transform(data)
return -self.detector.score_samples(scaled_data)
- ๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ ์ค์์ฑ
- ๊ต์ฐจ ๊ฒ์ฆ ํ์ฉ
- ํ์ดํผํ๋ผ๋ฏธํฐ ํ๋
- ํ์ดํ๋ผ์ธ ๊ตฌ์ถ
- ๋ชจ๋ธ ํ๊ฐ ์งํ ์ ํ
- ๊ณผ์ ํฉ ๋ฐฉ์ง
- ํน์ฑ ์ ํ๊ณผ ์์ง๋์ด๋ง
- ๋ถ๊ท ํ ๋ฐ์ดํฐ ์ฒ๋ฆฌ
- ๋ชจ๋ธ ์ ์ฅ๊ณผ ๋ก๋
- ํ์ฅ์ฑ ๊ณ ๋ ค