Coverage for mindsdb / integrations / utilities / time_series_utils.py: 0%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import numpy as np 

2import pandas as pd 

3from pandas.tseries.frequencies import to_offset 

4from sklearn.metrics import r2_score 

5 

6# handle optional dependency 

7try: 

8 import hierarchicalforecast # noqa: F401 

9 from hierarchicalforecast.core import HierarchicalReconciliation 

10 from hierarchicalforecast.methods import BottomUp 

11 from hierarchicalforecast.utils import aggregate 

12except (ImportError, SystemError): 

13 HierarchicalReconciliation = None 

14 BottomUp = None 

15 aggregate = None 

16 

17from mindsdb.utilities import log 

18 

19DEFAULT_FREQUENCY = "D" 

20DEFAULT_RECONCILER = BottomUp 

21 

22 

23def transform_to_nixtla_df(df, settings_dict, exog_vars=[]): 

24 """Transform dataframes into the specific format required by StatsForecast. 

25 

26 Nixtla packages require dataframes to have the following columns: 

27 unique_id -> the grouping column. If multiple groups are specified then 

28 we join them into one name using a / char. 

29 ds -> the date series 

30 y -> the target variable for prediction 

31 

32 You can optionally include exogenous regressors after these three columns, but 

33 they must be numeric. 

34 """ 

35 nixtla_df = df.copy() 

36 

37 # Resample every group 

38 freq = settings_dict['frequency'] 

39 resampled_df = pd.DataFrame(columns=nixtla_df.columns) 

40 if settings_dict["group_by"] and settings_dict["group_by"] != ['__group_by']: 

41 for group, groupdf in nixtla_df.groupby(by=settings_dict["group_by"]): 

42 groupdf.index = pd.to_datetime(groupdf.pop(settings_dict["order_by"])) 

43 resampled_groupdf = pd.DataFrame(groupdf[settings_dict['target']].resample(freq).mean()) 

44 for k, v in zip(settings_dict["group_by"], group): 

45 resampled_groupdf[k] = v 

46 resampled_groupdf = resampled_groupdf.reset_index() 

47 resampled_df = pd.concat([resampled_df, resampled_groupdf]) 

48 nixtla_df = resampled_df 

49 

50 # Transform group columns into single unique_id column 

51 if len(settings_dict["group_by"]) > 1: 

52 for col in settings_dict["group_by"]: 

53 nixtla_df[col] = nixtla_df[col].astype(str) 

54 nixtla_df["unique_id"] = nixtla_df[settings_dict["group_by"]].agg("/".join, axis=1) 

55 group_col = "ignore this" 

56 else: 

57 group_col = settings_dict["group_by"][0] 

58 

59 # Rename columns to statsforecast names 

60 nixtla_df = nixtla_df.rename( 

61 {settings_dict["target"]: "y", settings_dict["order_by"]: "ds", group_col: "unique_id"}, axis=1 

62 ) 

63 

64 if "unique_id" not in nixtla_df.columns: 

65 # add to dataframe as it is expected by statsforecast 

66 nixtla_df["unique_id"] = '1' 

67 

68 columns_to_keep = ["unique_id", "ds", "y"] + exog_vars 

69 nixtla_df["ds"] = pd.to_datetime(nixtla_df["ds"]) 

70 return nixtla_df[columns_to_keep] 

71 

72 

73def get_results_from_nixtla_df(nixtla_df, model_args): 

74 """Transform dataframes generated by StatsForecast back to their original format. 

75 

76 This will return the dataframe to the original format supplied by the MindsDB query. 

77 """ 

78 return_df = nixtla_df.reset_index(drop=True if 'unique_id' in nixtla_df.columns else False) 

79 if len(model_args["group_by"]) > 0: 

80 if len(model_args["group_by"]) > 1: 

81 for i, group in enumerate(model_args["group_by"]): 

82 return_df[group] = return_df["unique_id"].apply(lambda x: x.split("/")[i]) 

83 else: 

84 group_by_col = model_args["group_by"][0] 

85 return_df[group_by_col] = return_df["unique_id"] 

86 

87 return return_df.drop(["unique_id"], axis=1).rename({"ds": model_args["order_by"]}, axis=1) 

88 

89 

90def infer_frequency(df, time_column, default=DEFAULT_FREQUENCY): 

91 try: # infer frequency from time column 

92 date_series = pd.to_datetime(df.sort_values(by=time_column)[time_column]).unique() 

93 inferred_freq = pd.infer_freq(date_series) # call this first to get e.g. months & other irregular periods right 

94 if inferred_freq is None: 

95 values, counts = np.unique(np.diff(date_series), return_counts=True) 

96 delta = values[np.argmax(counts)] 

97 inferred_freq = to_offset(pd.to_timedelta(delta)).freqstr 

98 except TypeError: 

99 inferred_freq = default 

100 return inferred_freq if inferred_freq is not None else default 

101 

102 

103def get_model_accuracy_dict(nixtla_results_df, metric=r2_score): 

104 """Calculates accuracy for each model in the nixtla results df.""" 

105 accuracy_dict = {} 

106 for column in nixtla_results_df.columns: 

107 if column in ["unique_id", "ds", "y", "cutoff"]: 

108 continue 

109 model_error = metric(nixtla_results_df["y"], nixtla_results_df[column]) 

110 accuracy_dict[column] = model_error 

111 return accuracy_dict 

112 

113 

114def get_best_model_from_results_df(nixtla_results_df, metric=r2_score): 

115 """Gets the best model based, on lowest error, from a results df 

116 with a column for each nixtla model. 

117 """ 

118 best_model, current_accuracy = None, 0 

119 accuracy_dict = get_model_accuracy_dict(nixtla_results_df, metric) 

120 for model, accuracy in accuracy_dict.items(): 

121 if accuracy > current_accuracy: 

122 best_model, current_accuracy = model, accuracy 

123 return best_model 

124 

125 

126def spec_hierarchy_from_list(col_list): 

127 """Gets the hierarchy spec from the list of hierarchy cols""" 

128 spec = [["Total"]] 

129 for i in range(len(col_list)): 

130 spec.append(["Total"] + col_list[: i + 1]) 

131 return spec 

132 

133 

134def get_hierarchy_from_df(df, model_args): 

135 """Extracts hierarchy from the raw df, using the provided spec and args. 

136 

137 The "hierarchy" model arg is a list of format 

138 [<level 1>, <level 2>, ..., <level n>] 

139 where each element is a level in the hierarchy. 

140 

141 We return a tuple (nixtla_df, hier_df, hier_dict) where: 

142 nixtla_df is a dataframe in the format nixtla packages uses for training 

143 hier_df is a matrix of 0s and 1s showing the hierarchical structure 

144 hier_dict is a dictionary with the hierarchical structure. See the unit test 

145 in tests/unit/ml_handlers/test_time_series_utils.py for an example. 

146 """ 

147 if HierarchicalReconciliation is not None: 

148 spec = spec_hierarchy_from_list(model_args["hierarchy"]) 

149 

150 nixtla_df = df.rename({model_args["order_by"]: "ds", model_args["target"]: "y"}, axis=1) 

151 nixtla_df["ds"] = pd.to_datetime(nixtla_df["ds"]) 

152 for col in model_args["group_by"]: 

153 nixtla_df[col] = nixtla_df[col].astype(str) # grouping columns need to be string format 

154 nixtla_df.insert(0, "Total", "total") 

155 

156 nixtla_df, hier_df, hier_dict = aggregate(nixtla_df, spec) # returns (nixtla_df, hierarchy_df, hierarchy_dict) 

157 return nixtla_df, hier_df, hier_dict 

158 else: 

159 log.logger.warning("HierarchicalForecast is not installed, but `get_hierarchy_from_df` has been called. This should never happen.") # noqa 

160 

161 

162def reconcile_forecasts(nixtla_df, forecast_df, hierarchy_df, hierarchy_dict): 

163 """Reconciles forecast results according to the hierarchy.""" 

164 if HierarchicalReconciliation is not None: 

165 reconcilers = [DEFAULT_RECONCILER()] 

166 hrec = HierarchicalReconciliation(reconcilers=reconcilers) 

167 reconciled_df = hrec.reconcile(Y_hat_df=forecast_df, Y_df=nixtla_df, S=hierarchy_df, tags=hierarchy_dict) 

168 return get_results_from_reconciled_df(reconciled_df, hierarchy_df) 

169 else: 

170 log.logger.warning("HierarchicalForecast is not installed, but `reconcile_forecasts` has been called. This should never happen.") # noqa 

171 

172 

173def get_results_from_reconciled_df(reconciled_df, hierarchy_df): 

174 """Formats the reconciled df into a normal Nixtla results df. 

175 

176 First drops the model output columns that haven't been reconciled. 

177 Then drops rows corresponding to higher level predictions that were not 

178 in the original dataframe, e.g. the total for each grouping. 

179 """ 

180 # Drop unnecessary columns 

181 for col in reconciled_df.columns: 

182 if col not in ["ds", "y"]: 

183 if "BottomUp" not in col: 

184 results_df = reconciled_df.drop(col, axis=1) # removes original forecast column 

185 break 

186 

187 # Drop higher-level rows 

188 lowest_level_ids = hierarchy_df.columns 

189 results_df = results_df[results_df.index.isin(lowest_level_ids)] 

190 results_df.index = results_df.index.str.replace("total/", "") 

191 return results_df