Source code for mlbox.preprocessing.reader

# coding: utf-8
# Author: Axel ARONIO DE ROMBLAY <axelderomblay@gmail.com>
# License: BSD 3 clause
import sys
import pickle
import os
import time
import warnings
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from joblib import Parallel, delayed


def convert_list(serie):

    """Converts lists in a pandas serie into a dataframe
    where which element of a list is a column

    Parameters
    ----------
    serie : pandas Serie
        The serie you want to cast into a dataframe

    Returns
    -------
    pandas DataFrame
        The converted dataframe
    """

    import numpy
    import pandas

    if (serie.apply(lambda x: type(x) == list).sum() > 0):

        serie = serie.apply(lambda x: [x] if type(x) != list else x)
        cut = int(numpy.percentile(serie.apply(len), 90))  # TODO: To test

        serie = serie.apply(lambda x: x[:cut])

        return pandas.DataFrame(serie.tolist(),
                                index=serie.index,
                                columns=[serie.name + "_item" + str(i + 1)
                                         for i in range(cut)]
                                )

    else:

        return serie


def convert_float_and_dates(serie):

    """Converts into float if possible and converts dates.

    Creates timestamp from 01/01/2017, year, month, day, day_of_week and hour

    Parameters
    ----------
    serie : pandas Serie
        The serie you want to convert

    Returns
    -------
    pandas DataFrame
        The converted dataframe
    """

    import pandas

    # dtype is already a date

    if (serie.dtype == 'datetime64[ns]'):

        df = pandas.DataFrame([], index=serie.index)
        df[serie.name + "_TIMESTAMP"] = (pandas.DatetimeIndex(serie) -
                                         pandas.datetime(2017, 1, 1)
                                         ).total_seconds()

        df[serie.name + "_YEAR"] = pandas.DatetimeIndex(serie).year.astype(  # noqa
            float)  # TODO: be careful with nan ! object or float ??

        df[serie.name + "_MONTH"] = pandas.DatetimeIndex(serie).month.astype(  # noqa
            float)  # TODO: be careful with nan ! object or float ??

        df[serie.name + "_DAY"] = pandas.DatetimeIndex(serie).day.astype(
            float)  # TODO: be careful with nan ! object or float ??

        df[serie.name + "_DAYOFWEEK"] = pandas.DatetimeIndex(serie).dayofweek.astype(  # noqa
            float)  # TODO: be careful with nan ! object or float ??

        df[serie.name + "_HOUR"] = pandas.DatetimeIndex(serie).hour.astype(float) + \
                                   pandas.DatetimeIndex(serie).minute.astype(float)/60. + \
                                   pandas.DatetimeIndex(serie).second.astype(float)/3600.

        return df

    else:

        # Convert float

        try:
            serie = serie.apply(float)

        except:
            pass

        # Cleaning/converting dates

        if (serie.dtype != 'object'):
            return serie

        else:
            # trying to cast into date
            df = pandas.DataFrame([], index=serie.index)

            try:

                serie_to_df = pandas.DatetimeIndex(pd.to_datetime(serie))

                df[serie.name + "_TIMESTAMP"] = (serie_to_df -
                                                 pandas.datetime(2017, 1, 1)
                                                 ).total_seconds()

                df[serie.name + "_YEAR"] = serie_to_df.year.astype(
                    float)  # TODO: be careful with nan ! object or float??

                df[serie.name + "_MONTH"] = serie_to_df.month.astype(
                    float)  # TODO: be careful with nan ! object or float??

                df[serie.name + "_DAY"] = serie_to_df.day.astype(
                    float)  # TODO: be careful with nan ! object or float??

                df[serie.name + "_DAYOFWEEK"] = serie_to_df.dayofweek.astype(
                    float)  # TODO: be careful with nan ! object or float??

                df[serie.name + "_HOUR"] = serie_to_df.hour.astype(float) + \
                                           serie_to_df.minute.astype(float)/60. + \
                                           serie_to_df.second.astype(float) / 3600.

                return df

            except:

                return serie


[docs]class Reader(): """Reads and cleans data Parameters ---------- sep : str, defaut = None Delimiter to use when reading a csv file. header : int or None, default = 0. If header=0, the first line is considered as a header. Otherwise, there is no header. Useful for csv and xls files. to_hdf5 : bool, default = True If True, dumps each file to hdf5 format. to_path : str, default = "save" Name of the folder where files and encoders are saved. verbose : bool, defaut = True Verbose mode """ def __init__(self, sep=None, header=0, to_hdf5=False, to_path="save", verbose=True): self.sep = sep self.header = header self.to_hdf5 = to_hdf5 self.to_path = to_path self.verbose = verbose
[docs] def clean(self, path, drop_duplicate=False): """Reads and cleans data (accepted formats : csv, xls, json and h5): - del Unnamed columns - casts lists into variables - try to cast variables into float - cleans dates and extracts timestamp from 01/01/2017, year, month, day, day_of_week and hour - drop duplicates (if drop_duplicate=True) Parameters ---------- path : str The path to the dataset. drop_duplicate: bool, default = False If True, drop duplicates when reading each file. Returns ------- pandas dataframe Cleaned dataset. """ ############################################################## # Reading ############################################################## start_time = time.time() if (path is None): raise ValueError("You must specify the path to load the data") else: type_doc = path.split(".")[-1] if (type_doc == 'csv'): if (self.sep is None): raise ValueError("You must specify the separator " "for a csv file") else: if (self.verbose): print("") print("reading csv : " + path.split("/")[-1] + " ...") df = pd.read_csv(path, sep=self.sep, header=self.header, engine='c', error_bad_lines=False) elif (type_doc == 'xls'): if (self.verbose): print("") print("reading xls : " + path.split("/")[-1] + " ...") df = pd.read_excel(path, header=self.header) elif (type_doc == 'h5'): if (sys.platform == "win32" and sys.version_info[0] <=3 and sys.version_info[1] <=5): raise ValueError("h5 format not supported for python under 3.6 on windows. Please upgrade python") if (self.verbose): print("") print("reading hdf5 : " + path.split("/")[-1] + " ...") df = pd.read_hdf(path) elif (type_doc == 'json'): if (sys.platform == "win32" and sys.version_info[0] <=3 and sys.version_info[1] <=5): raise ValueError("json format not supported for python under 3.6 on windows. Please upgrade python") if (self.verbose): print("") print("reading json : " + path.split("/")[-1] + " ...") df = pd.read_json(path) else: raise ValueError("The document extension cannot be handled") # Deleting unknown column try: del df["Unnamed: 0"] except: pass ############################################################## # Cleaning lists, floats and dates ############################################################## if (self.verbose): print("cleaning data ...") if (sys.platform == "win32"): df = pd.concat([convert_list(df[col]) for col in df.columns], axis=1) df = pd.concat([convert_float_and_dates(df[col]) for col in df.columns], axis=1) else: df = pd.concat(Parallel(n_jobs=-1)(delayed(convert_list)(df[col]) for col in df.columns), axis=1) df = pd.concat(Parallel(n_jobs=-1)(delayed(convert_float_and_dates)(df[col]) for col in df.columns), axis=1) # Drop duplicates if (drop_duplicate): if (self.verbose): print("dropping duplicates") df = df.drop_duplicates() else: pass if (self.verbose): print("CPU time: %s seconds" % (time.time() - start_time)) return df
[docs] def train_test_split(self, Lpath, target_name): """Creates train and test datasets Given a list of several paths and a target name, automatically creates and cleans train and test datasets. IMPORTANT: a dataset is considered as a test set if it does not contain the target value. Otherwise it is considered as part of a train set. Also determines the task and encodes the target (classification problem only). Finally dumps the datasets to hdf5, and eventually the target encoder. Parameters ---------- Lpath : list, defaut = None List of str paths to load the data target_name : str, default = None The name of the target. Works for both classification (multiclass or not) and regression. Returns ------- dict Dictionnary containing : - 'train' : pandas dataframe for train dataset - 'test' : pandas dataframe for test dataset - 'target' : encoded pandas Serie for the target on train set (with dtype='float' for a regression or dtype='int' for a classification) """ col = [] col_train = [] col_test = [] df_train = dict() df_test = dict() y_train = dict() if (type(Lpath) != list): raise ValueError("You must specify a list of paths " "to load all the data") elif (self.to_path is None): raise ValueError("You must specify a path to save your data " "and make sure your files are not already saved") else: ############################################################## # Reading the files ############################################################## for path in Lpath: # Reading each file df = self.clean(path, drop_duplicate=False) # Checking if the target exists to split into test and train if (target_name in df.columns): is_null = df[target_name].isnull() df_train[path] = df[~is_null].drop(target_name, axis=1) df_test[path] = df[is_null].drop(target_name, axis=1) y_train[path] = df[target_name][~is_null] else: df_test[path] = df del df # Exceptions if (sum([df_train[path].shape[0] for path in df_train.keys()]) == 0): raise ValueError("You have no train dataset. " "Please check that the " "target name is correct.") if ((sum([df_test[path].shape[0] for path in df_test.keys()]) == 0) & (self.verbose)): print("") print("You have no test dataset !") # Finding the common subset of features for i, df in enumerate(df_train.values()): if (i == 0): col_train = df.columns else: col_train = list(set(col_train) & set(df.columns)) for i, df in enumerate(df_test.values()): if (i == 0): col_test = df.columns else: col_test = list(set(col_test) & set(df.columns)) # Subset of common features col = sorted(list(set(col_train) & set(col_test))) if (self.verbose): print("") print("> Number of common features : " + str(len(col))) ############################################################## # Creating train, test and target dataframes ############################################################## print("") print("gathering and crunching for train and test datasets ...") # TODO: Optimize df_train = pd.concat([df[col] for df in df_train.values()]) df_test = pd.concat([df[col] for df in df_test.values()]) y_train = pd.concat([y for y in y_train.values()]) # optimiser !! # Checking shape of the target if (type(y_train) == pd.core.frame.DataFrame): raise ValueError("Your target contains more than two columns !" " Please check that only one column " "is named " + target_name) else: pass # Handling indices if (self.verbose): print("reindexing for train and test datasets ...") if (df_train.index.nunique() < df_train.shape[0]): df_train.index = range(df_train.shape[0]) if (df_test.index.nunique() < df_test.shape[0]): df_test.index = range(df_test.shape[0]) if (y_train.index.nunique() < y_train.shape[0]): y_train.index = range(y_train.shape[0]) # Dropping duplicates if (self.verbose): print("dropping training duplicates ...") # Temp adding target to check (x,y) duplicates... df_train[target_name] = y_train.values df_train = df_train.drop_duplicates() del df_train[target_name] y_train = y_train.loc[df_train.index] # TODO: Need to reindex ? # Deleting constant variables if (self.verbose): print("dropping constant variables on training set ...") for var in col: if (df_train[var].nunique(dropna=False) == 1): del df_train[var] del df_test[var] # Missing values sparse_features = (df_train.isnull().sum() * 100. / df_train.shape[0] ).sort_values(ascending=False) sparse = True if(sparse_features.max() == 0.0): sparse = False # Print information if (self.verbose): print("") print("> Number of categorical features:" " " + str(len(df_train.dtypes[df_train.dtypes == 'object'].index))) # noqa print("> Number of numerical features:" " " + str(len(df_train.dtypes[df_train.dtypes != 'object'].index))) # noqa print("> Number of training samples : " + str(df_train.shape[0])) print("> Number of test samples : " + str(df_test.shape[0])) if(sparse): print("") print("> Top sparse features " "(% missing values on train set):") print(np.round(sparse_features[sparse_features > 0.0][:5], 1)) else: print("") print("> You have no missing values on train set...") ############################################################## # Encoding target ############################################################## task = "regression" count = y_train.nunique() if (count <= 2): task = "classification" else: if (y_train.dtype == object): task = "classification" else: # no needs to convert into float pass if (self.verbose): print("") print("> Task : " + task) if (task == "classification"): if (self.verbose): print(y_train.value_counts()) print("") print("encoding target ...") enc = LabelEncoder() y_train = pd.Series(enc.fit_transform(y_train.values), index=y_train.index, name=target_name, dtype='int') if count == 1: warnings.warn("Your target set has only one class ! Please check it is correct, " "otherwise there is no need to use MLBox...") else: if (self.verbose): print(y_train.describe()) ############################################################## # Dumping ############################################################## # Creating a folder to save the files and target encoder try: os.mkdir(self.to_path) except OSError: pass if (self.to_hdf5): start_time = time.time() if (self.verbose): print("") print("dumping files into directory : " + self.to_path) # Temp adding target to dump train file... df_train[target_name] = y_train.values df_train.to_hdf(self.to_path + '/df_train.h5', 'train') del df_train[target_name] if (self.verbose): print("train dumped") df_test.to_hdf(self.to_path + '/df_test.h5', 'test') if (self.verbose): print("test dumped") print("CPU time: %s seconds" % (time.time() - start_time)) else: pass if (task == "classification"): fhand = open(self.to_path + '/target_encoder.obj', 'wb') pickle.dump(enc, fhand) fhand.close() else: pass return {"train": df_train, "test": df_test, 'target': y_train}