Source code for lightwood.ensemble.ts_stacked_ensemble

from copy import deepcopy
from typing import List, Optional

import torch
from torch import nn
from torch.optim import SGD
import numpy as np
import pandas as pd

from lightwood.mixer.base import BaseMixer
from lightwood.ensemble.stacked_ensemble import StackedEnsemble
from lightwood.encoder.array.ts_num_array import TsArrayNumericEncoder
from lightwood.api.types import PredictionArguments
from type_infer.dtype import dtype
from lightwood.data.encoded_ds import EncodedDs
from lightwood.helpers.log import log


[docs]class TsStackedEnsemble(StackedEnsemble): """ Thin wrapper for `StackedEnsemble` that enables forecasting support. """ def __init__(self, target, mixers: List[BaseMixer], data: EncodedDs, dtype_dict: dict, ts_analysis: dict, args: PredictionArguments, fit: Optional[bool] = True, **kwargs) -> None: dtype_dict = deepcopy(dtype_dict) dtype_dict[target] = dtype.float # hijack to correctly initialize parent class super().__init__(target, mixers, data, dtype_dict, args, fit=False) if not isinstance(data.encoders[target], TsArrayNumericEncoder): raise Exception('This ensemble can only be used to forecast!') self.ts_analysis = ts_analysis self.horizon = self.ts_analysis['tss'].horizon self.target_cols = [target] + [f'{target}_timestep_{t+1}' for t in range(self.horizon - 1)] self.agg_dim = 2 self.opt_max_iter = 1000 if fit and len(mixers) > 1: all_preds = torch.tensor(self.predict(data, args)).squeeze().reshape(-1, self.horizon, len(mixers)) actual = torch.tensor(data.data_frame[self.target_cols].values) nan_mask = actual != actual actual[nan_mask] = 0 all_preds[nan_mask, :] = 0 criterion = nn.SmoothL1Loss() optimizer = SGD([self.mixer_weights], lr=1e-3) def _eval_loss(): optimizer.zero_grad() weighted = torch.sum(all_preds * self.softmax(self.mixer_weights), dim=self.agg_dim) loss = criterion(weighted, actual) loss.backward() return loss for _ in range(self.opt_max_iter): optimizer.step(_eval_loss) self.mixer_weights = self.softmax(self.mixer_weights) log.info(f'Optimal stacking weights: {self.mixer_weights.detach().tolist()}') self.prepared = True def __call__(self, ds: EncodedDs, args: PredictionArguments) -> pd.DataFrame: assert self.prepared mixer_weights = torch.tensor(args.mixer_weights) if args.mixer_weights else self.mixer_weights output = pd.DataFrame() predictions = torch.tensor(np.concatenate(self.predict(ds, args), axis=2)) predictions = (predictions * mixer_weights).sum(axis=self.agg_dim) output['prediction'] = predictions.detach().numpy().tolist() return output