import time
import math
import torch
import numpy as np
import pandas as pd
import optuna
from typing import Dict, Union, Optional
from optuna import trial as trial_module
from sklearn import clone
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import check_cv, cross_val_predict
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from type_infer.dtype import dtype
from lightwood.helpers.log import log
from lightwood.encoder.base import BaseEncoder
from lightwood.data.encoded_ds import EncodedDs, ConcatedEncodedDs
from lightwood.mixer.base import BaseMixer
from lightwood.api.types import PredictionArguments
[docs]class RandomForest(BaseMixer):
model: Union[RandomForestClassifier, RandomForestRegressor]
dtype_dict: dict
target: str
fit_on_dev: bool
use_optuna: bool
supports_proba: bool
def __init__(
self,
stop_after: float,
target: str,
dtype_dict: Dict[str, str],
fit_on_dev: bool,
target_encoder: BaseEncoder,
use_optuna: bool = False,
):
"""
The `RandomForest` mixer supports both regression and classification tasks.
It inherits from sklearn.ensemble.RandomForestRegressor and sklearn.ensemble.RandomForestClassifier.
(https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html)
(https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html)
:param stop_after: time budget in seconds.
:param target: name of the target column that the mixer will learn to predict.
:param dtype_dict: dictionary with dtypes of all columns in the data.
:param fit_on_dev: whether to fit on the dev dataset.
:param use_optuna: whether to activate the automated hyperparameter search (optuna-based). Note that setting this flag to `True` does not guarantee the search will run, rather, the speed criteria will be checked first (i.e., if a single iteration is too slow with respect to the time budget, the search will not take place).
""" # noqa
super().__init__(stop_after)
self.target = target
self.dtype_dict = dtype_dict
self.fit_on_dev = fit_on_dev
self.use_optuna = use_optuna
self.target_encoder = target_encoder
self.model = None
self.positive_domain = False
self.num_trials = 5
self.cv = 3
self.map = {}
self.cls_dtypes = [dtype.categorical, dtype.binary, dtype.cat_tsarray]
self.float_dtypes = [dtype.float, dtype.quantity, dtype.num_tsarray]
self.num_dtypes = [dtype.integer] + self.float_dtypes
self.supports_proba = dtype_dict[target] in self.cls_dtypes
self.is_classifier = self.supports_proba
self.stable = True
def _multi_logloss(self, y_true: np.ndarray, y_pred: np.ndarray, eps: float = 1e-15):
# ('evaluate model effects' not use this function)
y_pred = np.clip(y_pred, eps, 1 - eps)
score = np.mean([-math.log(y_pred[i][self.map[y]]) for i, y in enumerate(y_true)])
return score
[docs] def fit(self, train_data: EncodedDs, dev_data: EncodedDs) -> None:
"""
Fits the RandomForest model.
:param train_data: encoded features for training dataset
:param dev_data: encoded features for dev dataset
"""
started = time.time()
log.info('Started fitting RandomForest model')
output_dtype = self.dtype_dict[self.target]
if output_dtype not in self.cls_dtypes + self.num_dtypes:
log.error(f'RandomForest mixer not supported for type: {output_dtype}')
raise Exception(f'RandomForest mixer not supported for type: {output_dtype}')
# concat the data if fit on dev
if self.fit_on_dev:
train_data = ConcatedEncodedDs([train_data, dev_data])
# initialize the model
init_params = {
'n_estimators': 50,
'max_depth': 5,
'bootstrap': True,
'n_jobs': -1,
'random_state': 0
}
if self.is_classifier:
X = train_data.get_encoded_data(include_target=False)
Y = train_data.get_column_original_data(self.target)
self.model = RandomForestClassifier(**init_params)
self.model.fit(X, Y) # sample_weight
self.map = {cat: idx for idx, cat in enumerate(self.model.classes_)} # for multi_logloss
else:
X = train_data.get_encoded_data(include_target=False)
Y = train_data.get_encoded_column_data(self.target)
self.model = RandomForestRegressor(**init_params)
self.model.fit(X, Y) # sample_weight
# optimize params
metric, predict_method = (self._multi_logloss, 'predict_proba') if self.is_classifier \
else (mean_squared_error, 'predict')
def objective(trial: trial_module.Trial):
criterion = trial.suggest_categorical("criterion", "gini") if self.is_classifier else 'squared_error'
params = {
'n_estimators': trial.suggest_int('n_estimators', 2, 512),
'criterion': criterion,
}
self.model.set_params(**params)
y_pred = cross_val_predict(self.model, X, Y, cv=self.cv, method=predict_method)
cv = check_cv(self.cv, Y, classifier=self.is_classifier) #
score = np.mean([metric(np.array(Y)[val_idx], y_pred[val_idx]) for _, val_idx in cv.split(X, Y)])
return score
elapsed = time.time() - started
num_trials = max(min(int(self.stop_after / elapsed) - 2, self.num_trials), 0)
if self.use_optuna:
log.info(f'The number of trials (Optuna) is {num_trials}.')
if self.use_optuna and num_trials > 0:
init_score = metric(Y, getattr(self.model, predict_method)(X))
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=num_trials)
opt_model = clone(self.model)
opt_model.set_params(**study.best_params)
opt_model.fit(X, Y)
optuna_score = metric(Y, getattr(opt_model, predict_method)(X))
log.info(f'init_score: {init_score}, optuna_score: {optuna_score}')
if init_score <= optuna_score:
self.model.set_params(**init_params)
else:
self.model = opt_model
log.info(f'RandomForest parameters of the best trial: {study.best_params}')
# evaluate model effects
if self.fit_on_dev:
log.info(f'RandomForest based correlation of (train data): {self.model.score(X, Y)}')
X = dev_data.get_encoded_data(include_target=False)
if self.is_classifier:
Y = dev_data.get_column_original_data(self.target)
else:
Y = dev_data.get_encoded_column_data(self.target)
log.info(f'RandomForest based correlation of (dev data): {self.model.score(X, Y)}')
else:
log.info(f'RandomForest based correlation of: {self.model.score(X, Y)}')
[docs] def partial_fit(self, train_data: EncodedDs, dev_data: EncodedDs, args: Optional[dict] = None) -> None:
"""
The RandomForest mixer does not support updates. If the model does not exist, a new one will be created and fitted.
:param train_data: encoded features for (new) training dataset
:param dev_data: encoded features for (new) dev dataset
""" # noqa
if self.model is None:
self.fit(train_data, dev_data)
def __call__(self, ds: EncodedDs,
args: PredictionArguments = PredictionArguments()) -> pd.DataFrame:
"""
Call a trained RandomForest mixer to output predictions for the target column.
:param ds: input data with values for all non-target columns.
:param args: inference-time arguments (e.g. whether to output predicted labels or probabilities).
:return: dataframe with predictions.
"""
data = ds.get_encoded_data(include_target=False).numpy()
if self.is_classifier:
predictions = self.model.predict_proba(data)
decoded_predictions = self.model.classes_.take(np.argmax(predictions, axis=1), axis=0)
else:
predictions = self.model.predict(data)
if predictions.ndim == 1:
decoded_predictions = predictions
else:
decoded_predictions = self.target_encoder.decode(torch.Tensor(predictions))
if self.positive_domain:
decoded_predictions = [max(0, p) for p in decoded_predictions]
ydf = pd.DataFrame({'prediction': decoded_predictions})
if args.predict_proba and self.supports_proba:
for idx, label in enumerate(self.model.classes_):
ydf[f'__mdb_proba_{label}'] = predictions[:, idx]
return ydf