from typing import Dict, List, Union, Optional
from copy import deepcopy
import numpy as np
import pandas as pd
from lightwood.helpers.log import log
from lightwood.encoder.base import BaseEncoder
from lightwood.mixer.base import BaseMixer
from lightwood.mixer.xgboost import XGBoostMixer
from lightwood.api.types import PredictionArguments, TimeseriesSettings
from lightwood.data.encoded_ds import EncodedDs, ConcatedEncodedDs
[docs]class XGBoostArrayMixer(BaseMixer):
"""XGBoost-based model, intended for usage in forecasting tasks."""
models: List[XGBoostMixer]
submodel_stop_after: float
target: str
supports_proba: bool
ts_analysis: Dict
tss: TimeseriesSettings
def __init__(
self,
stop_after: float,
target: str,
dtype_dict: Dict[str, str],
input_cols: List[str],
fit_on_dev: bool,
target_encoder: BaseEncoder,
ts_analysis: Dict[str, object],
use_stl: bool,
tss: TimeseriesSettings
):
super().__init__(stop_after)
self.tss = tss
self.horizon = tss.horizon
self.submodel_stop_after = stop_after / self.horizon
self.target = target
self.offset_pred_cols = [f'{self.target}_timestep_{i}' for i in range(1, self.horizon)]
if set(input_cols) != {self.tss.order_by}:
input_cols.remove(self.tss.order_by)
for col in self.offset_pred_cols:
dtype_dict[col] = dtype_dict[self.target]
self.models = [XGBoostMixer(self.submodel_stop_after,
target_col,
dtype_dict,
input_cols,
False, # fit_on_dev,
False, # use_optuna
target_encoder)
for _, target_col in zip(range(self.horizon), [target] + self.offset_pred_cols)]
self.ts_analysis = ts_analysis
self.supports_proba = False
self.use_stl = False
self.stable = False
def _fit(self, train_data: EncodedDs, dev_data: EncodedDs, submodel_method='fit') -> None:
original_train = deepcopy(train_data.data_frame)
original_dev = deepcopy(dev_data.data_frame)
for timestep in range(self.horizon):
getattr(self.models[timestep], submodel_method)(train_data, dev_data)
# restore dfs
train_data.data_frame = original_train
dev_data.data_frame = original_dev
[docs] def fit(self, train_data: EncodedDs, dev_data: EncodedDs) -> None:
log.info('Started fitting LGBM models for array prediction')
self._fit(train_data, dev_data, submodel_method='fit')
[docs] def partial_fit(self, train_data: EncodedDs, dev_data: EncodedDs, args: Optional[dict] = None) -> None:
log.info('Updating array of LGBM models...')
self._fit(train_data, dev_data, submodel_method='partial_fit')
def __call__(self, ds: Union[EncodedDs, ConcatedEncodedDs],
args: PredictionArguments = PredictionArguments()) -> pd.DataFrame:
if args.predict_proba:
log.warning('This model does not output probability estimates')
original_df = deepcopy(ds.data_frame)
length = sum(ds.encoded_ds_lengths) if isinstance(ds, ConcatedEncodedDs) else len(ds)
ydf = pd.DataFrame(0, # zero-filled
index=np.arange(length),
columns=[f'prediction_{i}' for i in range(self.horizon)])
for timestep in range(self.horizon):
ydf[f'prediction_{timestep}'] = self.models[timestep](ds, args)['prediction'].values
if self.models[0].positive_domain:
ydf = ydf.clip(0)
ydf['prediction'] = ydf.values.tolist()
ds.data_frame = original_df
return ydf[['prediction']]