Source code for mlbox.optimisation.optimiser

# coding: utf-8
# Author: Axel ARONIO DE ROMBLAY <axelderomblay@gmail.com>
# License: BSD 3 clause

import numpy as np
import pandas as pd
import warnings
import time

from hyperopt import fmin, hp, tpe
from sklearn.model_selection import cross_val_score, KFold, StratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.metrics import SCORERS, make_scorer, roc_auc_score

from ..encoding.na_encoder import NA_encoder
from ..encoding.categorical_encoder import Categorical_encoder
from ..model.classification.feature_selector import Clf_feature_selector
from ..model.regression.feature_selector import Reg_feature_selector
from ..model.classification.stacking_classifier import StackingClassifier
from ..model.regression.stacking_regressor import StackingRegressor
from ..model.classification.classifier import Classifier
from ..model.regression.regressor import Regressor


[docs]class Optimiser(): """Optimises hyper-parameters of the whole Pipeline. - NA encoder (missing values encoder) - CA encoder (categorical features encoder) - Feature selector (OPTIONAL) - Stacking estimator - feature engineer (OPTIONAL) - Estimator (classifier or regressor) Works for both regression and classification (multiclass or binary) tasks. Parameters ---------- scoring : str, callable or None. default: None A string or a scorer callable object. If None, "neg_log_loss" is used for classification and "neg_mean_squared_error" for regression Available scorings can be found in the module sklearn.metrics: https://scikit-learn.org/stable/modules/model_evaluation.html#the-scoring-parameter-defining-model-evaluation-rules n_folds : int, default = 2 The number of folds for cross validation (stratified for classification) random_state : int, default = 1 Pseudo-random number generator state used for shuffling to_path : str, default = "save" Name of the folder where models are saved verbose : bool, default = True Verbose mode """ def __init__(self, scoring=None, n_folds=2, random_state=1, to_path="save", verbose=True): self.scoring = scoring self.n_folds = n_folds self.random_state = random_state self.to_path = to_path self.verbose = verbose warnings.warn("Optimiser will save all your fitted models into directory '" +str(self.to_path)+"/joblib'. Please clear it regularly.") def get_params(self, deep=True): return {'scoring': self.scoring, 'n_folds': self.n_folds, 'random_state': self.random_state, 'to_path': self.to_path, 'verbose': self.verbose} def set_params(self, **params): self.__fitOK = False for k, v in params.items(): if k not in self.get_params(): warnings.warn("Invalid parameter a for optimiser Optimiser. " "Parameter IGNORED. Check the list of available " "parameters with `optimiser.get_params().keys()`") else: setattr(self, k, v)
[docs] def evaluate(self, params, df): """Evaluates the data. Evaluates the data with a given scoring function and given hyper-parameters of the whole pipeline. If no parameters are set, default configuration for each step is evaluated : no feature selection is applied and no meta features are created. Parameters ---------- params : dict, default = None. Hyper-parameters dictionary for the whole pipeline. - The keys must respect the following syntax : "enc__param". - "enc" = "ne" for na encoder - "enc" = "ce" for categorical encoder - "enc" = "fs" for feature selector [OPTIONAL] - "enc" = "stck"+str(i) to add layer n°i of meta-features [OPTIONAL] - "enc" = "est" for the final estimator - "param" : a correct associated parameter for each step. Ex: "max_depth" for "enc"="est", ... - The values are those of the parameters. Ex: 4 for key = "est__max_depth", ... df : dict, default = None Dataset dictionary. Must contain keys and values: - "train": pandas DataFrame for the train set. - "target" : encoded pandas Serie for the target on train set (with dtype='float' for a regression or dtype='int' for a classification). Indexes should match the train set. Returns ------- float. The score. The higher the better. Positive for a score and negative for a loss. Examples -------- >>> from mlbox.optimisation import * >>> from sklearn.datasets import load_boston >>> #load data >>> dataset = load_boston() >>> #evaluating the pipeline >>> opt = Optimiser() >>> params = { ... "ne__numerical_strategy" : 0, ... "ce__strategy" : "label_encoding", ... "fs__threshold" : 0.1, ... "stck__base_estimators" : [Regressor(strategy="RandomForest"), Regressor(strategy="ExtraTrees")], ... "est__strategy" : "Linear" ... } >>> df = {"train" : pd.DataFrame(dataset.data), "target" : pd.Series(dataset.target)} >>> opt.evaluate(params, df) """ ne = NA_encoder() ce = Categorical_encoder() ########################################## # Automatically checking the task ########################################## # TODO: a lot of code can be factorized for the different tasks ########################################## # Classification ########################################## if (df['target'].dtype == 'int'): # Cross validation counts = df['target'].value_counts() classes_to_drop = counts[counts < self.n_folds].index mask_to_drop = df['target'].apply(lambda x: x in classes_to_drop) indexes_to_drop = df['target'][mask_to_drop].index n_classes = len(counts) - len(classes_to_drop) if n_classes == 1: raise ValueError("Your target has not enough classes. You can't run the optimiser") cv = StratifiedKFold(n_splits=self.n_folds, shuffle=True, random_state=self.random_state) # Estimator est = Classifier() # Feature selection if specified fs = None if (params is not None): for p in params.keys(): if (p.startswith("fs__")): fs = Clf_feature_selector() else: pass # Stacking if specified STCK = {} if (params is not None): for p in params.keys(): if (p.startswith("stck")): # TODO: Check if p.split("__")[1] instead? STCK[p.split("__")[0]] = StackingClassifier(verbose=False) # noqa else: pass # Default scoring for classification if (self.scoring is None): self.scoring = 'neg_log_loss' # works also for multiclass pb else: if (type(self.scoring) == str): if (self.scoring not in list(SCORERS.keys())): warnings.warn("Unknown or invalid scoring metric. " "neg_log_loss is used instead.") self.scoring = 'neg_log_loss' else: # binary classification if n_classes <= 2: pass # multiclass classification else: warnings.warn("This is a multiclass problem. Please make sure that your scoring metric is " "appropriate.") if self.scoring+"_weighted" in list(SCORERS.keys()): warnings.warn("Weighted strategy for the scoring metric is used.") self.scoring = self.scoring + "_weighted" # specific scenarios else: if self.scoring == "roc_auc": self.scoring = make_scorer(lambda y_true, y_pred: roc_auc_score(pd.get_dummies(y_true), y_pred), # noqa greater_is_better=True, needs_proba=True) else: pass ########################################## # Regression ########################################## elif (df['target'].dtype == 'float'): # Cross validation indexes_to_drop = [] cv = KFold(n_splits=self.n_folds, shuffle=True, random_state=self.random_state) # Estimator est = Regressor() # Feature selection if specified fs = None if (params is not None): for p in params.keys(): if (p.startswith("fs__")): fs = Reg_feature_selector() else: pass # Stacking if specified STCK = {} if (params is not None): for p in params.keys(): if (p.startswith("stck")): # TODO: Check if p.split("__")[1] instead? STCK[p.split("__")[0]] = StackingRegressor(verbose=False) else: pass # Default scoring for regression if (self.scoring is None): self.scoring = "neg_mean_squared_error" else: if (type(self.scoring) == str): if (self.scoring not in list(SCORERS.keys())): warnings.warn("Unknown or invalid scoring metric. " "neg_mean_squared_error is used instead.") self.scoring = 'neg_mean_squared_error' else: pass else: pass else: raise ValueError("Impossible to determine the task. " "Please check that your target is encoded.") ########################################## # Creating the Pipeline ########################################## pipe = [("ne", ne), ("ce", ce)] # Do we need to cache transformers? cache = False if (params is not None): if ("ce__strategy" in params): if(params["ce__strategy"] == "entity_embedding"): cache = True else: pass else: pass if (fs is not None): if ("fs__strategy" in params): if(params["fs__strategy"] != "variance"): cache = True else: pass else: pass if (len(STCK) != 0): cache = True else: pass # Pipeline creation if (fs is not None): pipe.append(("fs", fs)) else: pass for stck in np.sort(list(STCK)): pipe.append((stck, STCK[stck])) pipe.append(("est", est)) if cache: pp = Pipeline(pipe, memory=self.to_path) else: pp = Pipeline(pipe) ########################################## # Fitting the Pipeline ########################################## start_time = time.time() # No params : default configuration if (params is None): set_params = True print('No parameters set. Default configuration is tested') else: try: pp = pp.set_params(**params) set_params = True except: set_params = False if (set_params): if (self.verbose): print("") print("#####################################################" " testing hyper-parameters... " "#####################################################") print("") print(">>> NA ENCODER :" + str(ne.get_params())) print("") print(">>> CA ENCODER :" + str({'strategy': ce.strategy})) if (fs is not None): print("") print(">>> FEATURE SELECTOR :" + str(fs.get_params())) for i, stck in enumerate(np.sort(list(STCK))): stck_params = STCK[stck].get_params().copy() stck_params_display = {k: stck_params[k] for k in stck_params.keys() if k not in ["level_estimator", "verbose", "base_estimators"]} print("") print(">>> STACKING LAYER n°" + str(i + 1) + " :" + str(stck_params_display)) for j, model in enumerate(stck_params["base_estimators"]): print("") print(" > base_estimator n°" + str(j + 1) + " :" + str(dict(list(model.get_params().items()) + list(model.get_estimator().get_params().items())))) print("") print(">>> ESTIMATOR :" + str( dict(list(est.get_params().items()) + list(est.get_estimator().get_params().items())) )) print("") try: # Computing the mean cross validation score across the folds scores = cross_val_score(estimator=pp, X=df['train'].drop(indexes_to_drop), y=df['target'].drop(indexes_to_drop), scoring=self.scoring, cv=cv) score = np.mean(scores) except: scores = [-np.inf for _ in range(self.n_folds)] score = -np.inf else: raise ValueError("Pipeline cannot be set with these parameters." " Check the name of your stages.") if (score == -np.inf): warnings.warn("An error occurred while computing the cross " "validation mean score. Please check that the parameter values are correct " "and that your scoring function is valid and appropriate to the task.") ########################################## # Reporting scores ########################################## out = " (" for i, s in enumerate(scores[:-1]): out = out + "fold " + str(i + 1) + " = " + str(s) + ", " if (self.verbose): print("") print("MEAN SCORE : " + str(self.scoring) + " = " + str(score)) print("VARIANCE : " + str(np.std(scores)) + out + "fold " + str(i + 2) + " = " + str(scores[-1]) + ")") print("CPU time: %s seconds" % (time.time() - start_time)) print("") return score
[docs] def optimise(self, space, df, max_evals=40): """Optimises the Pipeline. Optimises hyper-parameters of the whole Pipeline with a given scoring function. Algorithm used to optimize : Tree Parzen Estimator. IMPORTANT : Try to avoid dependent parameters and to set one feature selection strategy and one estimator strategy at a time. Parameters ---------- space : dict, default = None. Hyper-parameters space: - The keys must respect the following syntax : "enc__param". - "enc" = "ne" for na encoder - "enc" = "ce" for categorical encoder - "enc" = "fs" for feature selector [OPTIONAL] - "enc" = "stck"+str(i) to add layer n°i of meta-features [OPTIONAL] - "enc" = "est" for the final estimator - "param" : a correct associated parameter for each step. Ex: "max_depth" for "enc"="est", ... - The values must respect the syntax: {"search":strategy,"space":list} - "strategy" = "choice" or "uniform". Default = "choice" - list : a list of values to be tested if strategy="choice". Else, list = [value_min, value_max]. df : dict, default = None Dataset dictionary. Must contain keys and values: - "train": pandas DataFrame for the train set. - "target" : encoded pandas Serie for the target on train set (with dtype='float' for a regression or dtype='int' for a classification). Indexes should match the train set. max_evals : int, default = 40. Number of iterations. For an accurate optimal hyper-parameter, max_evals = 40. Returns ------- dict. The optimal hyper-parameter dictionary. Examples -------- >>> from mlbox.optimisation import * >>> from sklearn.datasets import load_boston >>> #loading data >>> dataset = load_boston() >>> #optimising the pipeline >>> opt = Optimiser() >>> space = { ... 'fs__strategy':{"search":"choice","space":["variance","rf_feature_importance"]}, ... 'est__colsample_bytree':{"search":"uniform", "space":[0.3,0.7]} ... } >>> df = {"train" : pd.DataFrame(dataset.data), "target" : pd.Series(dataset.target)} >>> best = opt.optimise(space, df, 3) """ hyperopt_objective = lambda params: -self.evaluate(params, df) # Creating a correct space for hyperopt if (space is None): warnings.warn( "Space is empty. Please define a search space. " "Otherwise, call the method 'evaluate' for custom settings") return dict() else: if (len(space) == 0): warnings.warn( "Space is empty. Please define a search space. " "Otherwise, call the method 'evaluate' for custom settings") return dict() else: hyper_space = {} for p in space.keys(): if ("space" not in space[p]): raise ValueError("You must give a space list ie values" " for hyper parameter " + p + ".") else: if ("search" in space[p]): if (space[p]["search"] == "uniform"): hyper_space[p] = hp.uniform(p, np.sort(space[p]["space"])[0], # noqa np.sort(space[p]["space"])[-1]) # noqa elif (space[p]["search"] == "choice"): hyper_space[p] = hp.choice(p, space[p]["space"]) else: raise ValueError( "Invalid search strategy " "for hyper parameter " + p + ". Please" " choose between 'choice' and 'uniform'.") else: hyper_space[p] = hp.choice(p, space[p]["space"]) best_params = fmin(hyperopt_objective, space=hyper_space, algo=tpe.suggest, max_evals=max_evals) # Displaying best_params for p, v in best_params.items(): if ("search" in space[p]): if (space[p]["search"] == "choice"): best_params[p] = space[p]["space"][v] else: pass else: best_params[p] = space[p]["space"][v] if (self.verbose): print("") print("") print("~" * 137) print("~" * 57 + " BEST HYPER-PARAMETERS " + "~" * 57) print("~" * 137) print("") print(best_params) return best_params