Coverage for mindsdb / integrations / utilities / time_series_utils.py: 0%
109 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import numpy as np
2import pandas as pd
3from pandas.tseries.frequencies import to_offset
4from sklearn.metrics import r2_score
6# handle optional dependency
7try:
8 import hierarchicalforecast # noqa: F401
9 from hierarchicalforecast.core import HierarchicalReconciliation
10 from hierarchicalforecast.methods import BottomUp
11 from hierarchicalforecast.utils import aggregate
12except (ImportError, SystemError):
13 HierarchicalReconciliation = None
14 BottomUp = None
15 aggregate = None
17from mindsdb.utilities import log
19DEFAULT_FREQUENCY = "D"
20DEFAULT_RECONCILER = BottomUp
23def transform_to_nixtla_df(df, settings_dict, exog_vars=[]):
24 """Transform dataframes into the specific format required by StatsForecast.
26 Nixtla packages require dataframes to have the following columns:
27 unique_id -> the grouping column. If multiple groups are specified then
28 we join them into one name using a / char.
29 ds -> the date series
30 y -> the target variable for prediction
32 You can optionally include exogenous regressors after these three columns, but
33 they must be numeric.
34 """
35 nixtla_df = df.copy()
37 # Resample every group
38 freq = settings_dict['frequency']
39 resampled_df = pd.DataFrame(columns=nixtla_df.columns)
40 if settings_dict["group_by"] and settings_dict["group_by"] != ['__group_by']:
41 for group, groupdf in nixtla_df.groupby(by=settings_dict["group_by"]):
42 groupdf.index = pd.to_datetime(groupdf.pop(settings_dict["order_by"]))
43 resampled_groupdf = pd.DataFrame(groupdf[settings_dict['target']].resample(freq).mean())
44 for k, v in zip(settings_dict["group_by"], group):
45 resampled_groupdf[k] = v
46 resampled_groupdf = resampled_groupdf.reset_index()
47 resampled_df = pd.concat([resampled_df, resampled_groupdf])
48 nixtla_df = resampled_df
50 # Transform group columns into single unique_id column
51 if len(settings_dict["group_by"]) > 1:
52 for col in settings_dict["group_by"]:
53 nixtla_df[col] = nixtla_df[col].astype(str)
54 nixtla_df["unique_id"] = nixtla_df[settings_dict["group_by"]].agg("/".join, axis=1)
55 group_col = "ignore this"
56 else:
57 group_col = settings_dict["group_by"][0]
59 # Rename columns to statsforecast names
60 nixtla_df = nixtla_df.rename(
61 {settings_dict["target"]: "y", settings_dict["order_by"]: "ds", group_col: "unique_id"}, axis=1
62 )
64 if "unique_id" not in nixtla_df.columns:
65 # add to dataframe as it is expected by statsforecast
66 nixtla_df["unique_id"] = '1'
68 columns_to_keep = ["unique_id", "ds", "y"] + exog_vars
69 nixtla_df["ds"] = pd.to_datetime(nixtla_df["ds"])
70 return nixtla_df[columns_to_keep]
73def get_results_from_nixtla_df(nixtla_df, model_args):
74 """Transform dataframes generated by StatsForecast back to their original format.
76 This will return the dataframe to the original format supplied by the MindsDB query.
77 """
78 return_df = nixtla_df.reset_index(drop=True if 'unique_id' in nixtla_df.columns else False)
79 if len(model_args["group_by"]) > 0:
80 if len(model_args["group_by"]) > 1:
81 for i, group in enumerate(model_args["group_by"]):
82 return_df[group] = return_df["unique_id"].apply(lambda x: x.split("/")[i])
83 else:
84 group_by_col = model_args["group_by"][0]
85 return_df[group_by_col] = return_df["unique_id"]
87 return return_df.drop(["unique_id"], axis=1).rename({"ds": model_args["order_by"]}, axis=1)
90def infer_frequency(df, time_column, default=DEFAULT_FREQUENCY):
91 try: # infer frequency from time column
92 date_series = pd.to_datetime(df.sort_values(by=time_column)[time_column]).unique()
93 inferred_freq = pd.infer_freq(date_series) # call this first to get e.g. months & other irregular periods right
94 if inferred_freq is None:
95 values, counts = np.unique(np.diff(date_series), return_counts=True)
96 delta = values[np.argmax(counts)]
97 inferred_freq = to_offset(pd.to_timedelta(delta)).freqstr
98 except TypeError:
99 inferred_freq = default
100 return inferred_freq if inferred_freq is not None else default
103def get_model_accuracy_dict(nixtla_results_df, metric=r2_score):
104 """Calculates accuracy for each model in the nixtla results df."""
105 accuracy_dict = {}
106 for column in nixtla_results_df.columns:
107 if column in ["unique_id", "ds", "y", "cutoff"]:
108 continue
109 model_error = metric(nixtla_results_df["y"], nixtla_results_df[column])
110 accuracy_dict[column] = model_error
111 return accuracy_dict
114def get_best_model_from_results_df(nixtla_results_df, metric=r2_score):
115 """Gets the best model based, on lowest error, from a results df
116 with a column for each nixtla model.
117 """
118 best_model, current_accuracy = None, 0
119 accuracy_dict = get_model_accuracy_dict(nixtla_results_df, metric)
120 for model, accuracy in accuracy_dict.items():
121 if accuracy > current_accuracy:
122 best_model, current_accuracy = model, accuracy
123 return best_model
126def spec_hierarchy_from_list(col_list):
127 """Gets the hierarchy spec from the list of hierarchy cols"""
128 spec = [["Total"]]
129 for i in range(len(col_list)):
130 spec.append(["Total"] + col_list[: i + 1])
131 return spec
134def get_hierarchy_from_df(df, model_args):
135 """Extracts hierarchy from the raw df, using the provided spec and args.
137 The "hierarchy" model arg is a list of format
138 [<level 1>, <level 2>, ..., <level n>]
139 where each element is a level in the hierarchy.
141 We return a tuple (nixtla_df, hier_df, hier_dict) where:
142 nixtla_df is a dataframe in the format nixtla packages uses for training
143 hier_df is a matrix of 0s and 1s showing the hierarchical structure
144 hier_dict is a dictionary with the hierarchical structure. See the unit test
145 in tests/unit/ml_handlers/test_time_series_utils.py for an example.
146 """
147 if HierarchicalReconciliation is not None:
148 spec = spec_hierarchy_from_list(model_args["hierarchy"])
150 nixtla_df = df.rename({model_args["order_by"]: "ds", model_args["target"]: "y"}, axis=1)
151 nixtla_df["ds"] = pd.to_datetime(nixtla_df["ds"])
152 for col in model_args["group_by"]:
153 nixtla_df[col] = nixtla_df[col].astype(str) # grouping columns need to be string format
154 nixtla_df.insert(0, "Total", "total")
156 nixtla_df, hier_df, hier_dict = aggregate(nixtla_df, spec) # returns (nixtla_df, hierarchy_df, hierarchy_dict)
157 return nixtla_df, hier_df, hier_dict
158 else:
159 log.logger.warning("HierarchicalForecast is not installed, but `get_hierarchy_from_df` has been called. This should never happen.") # noqa
162def reconcile_forecasts(nixtla_df, forecast_df, hierarchy_df, hierarchy_dict):
163 """Reconciles forecast results according to the hierarchy."""
164 if HierarchicalReconciliation is not None:
165 reconcilers = [DEFAULT_RECONCILER()]
166 hrec = HierarchicalReconciliation(reconcilers=reconcilers)
167 reconciled_df = hrec.reconcile(Y_hat_df=forecast_df, Y_df=nixtla_df, S=hierarchy_df, tags=hierarchy_dict)
168 return get_results_from_reconciled_df(reconciled_df, hierarchy_df)
169 else:
170 log.logger.warning("HierarchicalForecast is not installed, but `reconcile_forecasts` has been called. This should never happen.") # noqa
173def get_results_from_reconciled_df(reconciled_df, hierarchy_df):
174 """Formats the reconciled df into a normal Nixtla results df.
176 First drops the model output columns that haven't been reconciled.
177 Then drops rows corresponding to higher level predictions that were not
178 in the original dataframe, e.g. the total for each grouping.
179 """
180 # Drop unnecessary columns
181 for col in reconciled_df.columns:
182 if col not in ["ds", "y"]:
183 if "BottomUp" not in col:
184 results_df = reconciled_df.drop(col, axis=1) # removes original forecast column
185 break
187 # Drop higher-level rows
188 lowest_level_ids = hierarchy_df.columns
189 results_df = results_df[results_df.index.isin(lowest_level_ids)]
190 results_df.index = results_df.index.str.replace("total/", "")
191 return results_df