Source code for lightwood.mixer.xgboost_array

from typing import Dict, List, Union, Optional
from copy import deepcopy

import numpy as np
import pandas as pd

from lightwood.helpers.log import log
from lightwood.encoder.base import BaseEncoder
from lightwood.mixer.base import BaseMixer
from lightwood.mixer.xgboost import XGBoostMixer
from lightwood.api.types import PredictionArguments, TimeseriesSettings
from lightwood.data.encoded_ds import EncodedDs, ConcatedEncodedDs


[docs]class XGBoostArrayMixer(BaseMixer): """XGBoost-based model, intended for usage in forecasting tasks.""" models: List[XGBoostMixer] submodel_stop_after: float target: str supports_proba: bool ts_analysis: Dict tss: TimeseriesSettings def __init__( self, stop_after: float, target: str, dtype_dict: Dict[str, str], input_cols: List[str], fit_on_dev: bool, target_encoder: BaseEncoder, ts_analysis: Dict[str, object], use_stl: bool, tss: TimeseriesSettings ): super().__init__(stop_after) self.tss = tss self.horizon = tss.horizon self.submodel_stop_after = stop_after / self.horizon self.target = target self.offset_pred_cols = [f'{self.target}_timestep_{i}' for i in range(1, self.horizon)] if set(input_cols) != {self.tss.order_by}: input_cols.remove(self.tss.order_by) for col in self.offset_pred_cols: dtype_dict[col] = dtype_dict[self.target] self.models = [XGBoostMixer(self.submodel_stop_after, target_col, dtype_dict, input_cols, False, # fit_on_dev, False, # use_optuna target_encoder) for _, target_col in zip(range(self.horizon), [target] + self.offset_pred_cols)] self.ts_analysis = ts_analysis self.supports_proba = False self.use_stl = False self.stable = False def _fit(self, train_data: EncodedDs, dev_data: EncodedDs, submodel_method='fit') -> None: original_train = deepcopy(train_data.data_frame) original_dev = deepcopy(dev_data.data_frame) for timestep in range(self.horizon): getattr(self.models[timestep], submodel_method)(train_data, dev_data) # restore dfs train_data.data_frame = original_train dev_data.data_frame = original_dev
[docs] def fit(self, train_data: EncodedDs, dev_data: EncodedDs) -> None: log.info('Started fitting LGBM models for array prediction') self._fit(train_data, dev_data, submodel_method='fit')
[docs] def partial_fit(self, train_data: EncodedDs, dev_data: EncodedDs, args: Optional[dict] = None) -> None: log.info('Updating array of LGBM models...') self._fit(train_data, dev_data, submodel_method='partial_fit')
def __call__(self, ds: Union[EncodedDs, ConcatedEncodedDs], args: PredictionArguments = PredictionArguments()) -> pd.DataFrame: if args.predict_proba: log.warning('This model does not output probability estimates') original_df = deepcopy(ds.data_frame) length = sum(ds.encoded_ds_lengths) if isinstance(ds, ConcatedEncodedDs) else len(ds) ydf = pd.DataFrame(0, # zero-filled index=np.arange(length), columns=[f'prediction_{i}' for i in range(self.horizon)]) for timestep in range(self.horizon): ydf[f'prediction_{timestep}'] = self.models[timestep](ds, args)['prediction'].values if self.models[0].positive_domain: ydf = ydf.clip(0) ydf['prediction'] = ydf.values.tolist() ds.data_frame = original_df return ydf[['prediction']]