Source code for lightwood.helpers.ts

from typing import Tuple, Dict
from datetime import datetime

import numpy as np
import pandas as pd


def get_ts_groups(df: pd.DataFrame, tss) -> list:
    group_combinations = ['__default']
    if tss.group_by:
        groups = [tuple([g]) if not isinstance(g, tuple) else g
                  for g in list(df.groupby(by=tss.group_by).groups.keys())]
        group_combinations.extend(groups)
    return group_combinations


def get_delta(df: pd.DataFrame, tss) -> Tuple[Dict, Dict, Dict]:
    """
    Infer the sampling interval of each time series, by picking the most popular time interval observed in the training data.

    :param df: Dataframe with time series data.
    :param tss: timeseries settings

    :return:
    Dictionary with group combination tuples as keys. Values are dictionaries with the inferred delta for each series.
    """  # noqa
    df = df.copy()  # TODO: necessary?
    original_col = f'__mdb_original_{tss.order_by}'
    order_col = original_col if original_col in df.columns else tss.order_by
    deltas = {"__default": df[order_col].astype(float).diff().value_counts().index[0]}
    freq, period = detect_freq_period(deltas["__default"], tss, len(df))
    periods = {"__default": [period]}
    freqs = {"__default": freq}

    if tss.group_by:
        grouped = df.groupby(by=tss.group_by)
        for group, subset in grouped:
            if subset.shape[0] > 1:
                deltas[group] = subset[order_col].diff().value_counts().index[0]
                freq, period = detect_freq_period(deltas[group], tss, len(subset))
                freqs[group] = freq
                periods[group] = [period] if period is not None else [1]
            else:
                deltas[group] = 1.0
                periods[group] = [1]
                freqs[group] = 'S'

    return deltas, periods, freqs


def get_inferred_timestamps(df: pd.DataFrame, col: str, deltas: dict, tss, stat_analysis,
                            time_format='') -> pd.DataFrame:
    horizon = tss.horizon

    last = np.vstack(df[f'order_{col}'].dropna().values)[:, -1]

    if tss.group_by:
        gby = [f'group_{g}' for g in tss.group_by]
        series_delta = df[gby].apply(lambda x: deltas.get(tuple(x.values.tolist()),
                                                          deltas['__default']), axis=1).values
        series_delta = series_delta.reshape(-1, 1)
    else:
        series_delta = np.full_like(df.values[:, 0:1], deltas['__default'])

    last = np.repeat(np.expand_dims(last, axis=1), horizon, axis=1)
    lins = np.linspace(0, horizon - 1, num=horizon)
    series_delta = np.repeat(series_delta, horizon, axis=1)
    timestamps = last + series_delta * lins

    if time_format:
        if time_format.lower() == 'infer':
            tformat = stat_analysis.ts_stats['order_format']
        else:
            tformat = time_format

        if tformat:
            def _strfts(elt):
                return datetime.utcfromtimestamp(elt).strftime(tformat)
            timestamps = np.vectorize(_strfts)(timestamps)

    # truncate to horizon
    timestamps = timestamps[:, :horizon]

    # preserves original input format if horizon == 1
    if tss.horizon == 1:
        timestamps = timestamps.squeeze()

    df[f'order_{col}'] = timestamps.tolist()
    return df[f'order_{col}']


[docs]def add_tn_num_conf_bounds(data: pd.DataFrame, tss_args): """ Deprecated. Instead we now opt for the much better solution of having scores for each timestep (see all TS classes in analysis/nc) Add confidence (and bounds if applicable) to t+n predictions, for n>1 TODO: active research question: how to guarantee 1-e coverage for t+n, n>1 For now, (conservatively) increases width by the confidence times the log of the time step (and a scaling factor). """ # noqa for col in ['confidence', 'lower', 'upper']: data[col] = data[col].astype(object) for idx, row in data.iterrows(): error_increase = [row['confidence'][0]] + \ [row['confidence'][0] * np.log(np.e + t / 2) # offset by e so that y intercept is 1 for t in range(1, tss_args.horizon)] data['confidence'].iloc[idx] = [row['confidence'][0] for _ in range(tss_args.horizon)] preds = row['prediction'] width = row['upper'][0] - row['lower'][0] data['lower'].iloc[idx] = [pred - (width / 2) * modifier for pred, modifier in zip(preds, error_increase)] data['upper'].iloc[idx] = [pred + (width / 2) * modifier for pred, modifier in zip(preds, error_increase)] return data
def add_tn_cat_conf_bounds(data: pd.DataFrame, tss_args): data['confidence'] = data['confidence'].astype(object) for idx, row in data.iterrows(): data['confidence'].iloc[idx] = [row['confidence'] for _ in range(tss_args.horizon)] return data class Differencer: def __init__(self): self.original_train_series = None self.diffed_train_series = None self.first_train_value = None self.last_train_value = None def diff(self, series: np.array) -> pd.Series: series = self._flatten_series(series) s = pd.Series(series) return s.shift(-1) - s def fit(self, series: np.array) -> None: series = self._flatten_series(series) self.first_train_value = series[0] self.last_train_value = series[-1] self.original_train_series = series self.diffed_train_series = self.diff(series) def transform(self, series: np.array) -> pd.Series: series = self._flatten_series(series) return self.diff(series).shift(1).fillna(0) def inverse_transform(self, series: pd.Series, init=None) -> pd.Series: origin = init if init else self.last_train_value s = pd.Series(origin) s = s.append(series).dropna() return s.expanding().sum() @staticmethod def _flatten_series(series: np.ndarray) -> np.ndarray: if len(series.shape) > 2: raise Exception(f"Input data should be shaped (A,) or (A, 1), got {series.shape}") elif len(series.shape) == 2: series = series.flatten() return series def detect_freq_period(deltas: pd.DataFrame, tss, n_points) -> tuple: """ Helper method that, based on the most popular interval for a time series, determines its seasonal peridiocity (sp). This bit of information can be crucial for good modelling with methods like ARIMA. Supported time intervals are: * 'year' * 'semestral' * 'quarter' * 'bimonthly' * 'monthly' * 'weekly' * 'daily' * 'hourly' * 'minute' * 'second' Note: all computations assume that the first provided `order_by` column is the one that specifies the sp. :param deltas: output of `get_delta`, has the most popular interval for each time series. :param tss: timeseries settings. :return: for all time series 1) a dictionary with its sp and 2) a dictionary with the detected sampling frequency """ # noqa secs_to_interval = { 'yearly': 60 * 60 * 24 * 365, 'quarterly': 60 * 60 * 24 * 365 // 4, 'bimonthly': 60 * 60 * 24 * 31 * 2, 'monthly': 60 * 60 * 24 * 31, 'weekly': 60 * 60 * 24 * 7, 'daily': 60 * 60 * 24, 'hourly': 60 * 60, 'minute': 60, 'second': 1, 'millisecond': 0.001, 'microsecond': 1e-6, 'nanosecond': 1e-9, 'constant': 0 } freq_to_period = {interval: period for (interval, period) in tss.interval_periods} for tag, period in (('yearly', 1), ('quarterly', 4), ('bimonthly', 6), ('monthly', 12), ('weekly', 52), ('daily', 7), ('hourly', 24), ('minute', 1), ('second', 1), ('constant', 0)): if tag not in freq_to_period.keys(): if period <= n_points: freq_to_period[tag] = period else: freq_to_period[tag] = None diffs = [(tag, abs(deltas - secs)) for tag, secs in secs_to_interval.items()] freq, min_diff = sorted(diffs, key=lambda x: x[1])[0] multiplier = 1 if secs_to_interval[freq]: multiplier += int(min_diff / secs_to_interval[freq]) return freq_to_pandas(freq, multiplier=multiplier), freq_to_period.get(freq, 1) def freq_to_pandas(freq, multiplier=1): mapping = { 'constant': 'N', 'nanosecond': 'N', 'microsecond': 'us', 'millisecond': 'ms', 'second': 'S', 'minute': 'T', 'hourly': 'H', # custom logic 'daily': 'D', # custom logic 'weekly': 'W', # anchor logic 'monthly': 'M', # custom logic 'bimonthly': 'M', 'quarterly': 'Q', # anchor and custom logic 'yearly': 'Y', # anchor and custom logic } # TODO: implement custom dispatch for better precision, use row sample if available: # pandas.pydata.org/docs/user_guide/timeseries.html items = [mapping[freq]] if multiplier > 1: items.insert(0, str(multiplier)) return ''.join(items) def filter_ts(df: pd.DataFrame, tss, n_rows=1): """ This method triggers only for timeseries datasets. It returns a dataframe that filters out all but the first ``n_rows`` per group. """ # noqa if tss.is_timeseries: gby = tss.group_by if gby is None: df = df.iloc[[0]] else: ndf = pd.DataFrame(columns=df.columns) grouped = df.groupby(by=tss.group_by) for group, subdf in grouped: if group != '__default': ndf = pd.concat([ndf, subdf.iloc[:n_rows]]) df = ndf return df