Coverage for mindsdb / integrations / handlers / merlion_handler / adapters.py: 0%

117 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from merlion.models.anomaly.forecast_based.prophet import ProphetDetectorConfig, ProphetDetector 

2from merlion.models.anomaly.isolation_forest import IsolationForestConfig, IsolationForest 

3from merlion.models.anomaly.windstats import WindStatsConfig, WindStats 

4from merlion.models.automl.autoprophet import AutoProphetConfig, AutoProphet 

5from merlion.models.automl.autosarima import AutoSarimaConfig, AutoSarima 

6from merlion.models.forecast.smoother import MSESConfig, MSES 

7from merlion.post_process.threshold import AggregateAlarms 

8from merlion.transform.moving_average import DifferenceTransform 

9from scipy.stats import norm 

10 

11from merlion.models.defaults import DefaultDetectorConfig, DefaultDetector, DefaultForecaster, DefaultForecasterConfig 

12from merlion.utils import TimeSeries 

13 

14from enum import Enum 

15import pandas as pd 

16 

17 

18class MerlionArguments(Enum): 

19 target_seq_index = "target_seq_index" 

20 max_forecast_steps = "max_forecast_steps" 

21 max_backstep = "max_backstep" 

22 wind_sz = "wind_sz" 

23 alm_threshold = "alm_threshold" 

24 maxiter = "maxiter" 

25 

26 

27class BaseMerlionForecastAdapter: 

28 TARGET_SEQ_INDEX = 0 

29 DEFAULT_MAX_FORECAST_STEPS = 100 

30 DEFAULT_MAX_BACKSTEP = 60 

31 DEFAULT_MAXITER = 5 

32 

33 def __init__(self, **kwargs): 

34 self.max_forecast_steps = kwargs.get(MerlionArguments.max_forecast_steps.value, self.DEFAULT_MAX_FORECAST_STEPS) 

35 self.max_backstep = kwargs.get(MerlionArguments.max_backstep.value, self.DEFAULT_MAX_BACKSTEP) 

36 self.maxiter = kwargs.get(MerlionArguments.maxiter.value, self.DEFAULT_MAXITER) 

37 self.model = None 

38 

39 def to_bytes(self) -> bytes: 

40 return self.model.to_bytes() 

41 

42 def initialize_model(self, bytes): 

43 self.model = self.model.from_bytes(bytes) 

44 

45 def to_train_dataframe(self, df: pd.DataFrame, target: str) -> pd.DataFrame: 

46 columns = list(df.columns.values) 

47 columns.remove(target) 

48 columns.insert(self.TARGET_SEQ_INDEX, target) 

49 return df[columns] 

50 

51 def train(self, df: pd.DataFrame, target: str): 

52 df = self.to_train_dataframe(df=df, target=target) 

53 train_data = TimeSeries.from_pd(df) 

54 self.model.train(train_data=train_data) 

55 

56 def predict(self, df: pd.DataFrame, target: str) -> pd.DataFrame: 

57 forecast_step = self.max_forecast_steps 

58 df = df[df.index <= self.model.last_train_time + self.model.timedelta * forecast_step] 

59 if len(list(df.columns.values)) == 0: 

60 df.loc[:, target] = 0 

61 predict_data = TimeSeries.from_pd(df) 

62 predict_pred, predict_err = self.model.forecast(time_stamps=predict_data.time_stamps) 

63 return self.__prepare_forecast_return(target=target, pred_ts=predict_pred, err_ts=predict_err) 

64 

65 def __prepare_forecast_return(self, target: str, pred_ts: TimeSeries, err_ts: TimeSeries) -> pd.DataFrame: 

66 pred_df: pd.DataFrame = pred_ts.to_pd() 

67 if err_ts is None: 

68 err_df = None 

69 else: 

70 err_df = err_ts.to_pd() 

71 

72 if err_df is None or target in list(err_df.columns.values): # error and predict sometimes are same 

73 std = pred_df[target].std() 

74 pred_df[f"{target}__upper"] = pred_df[f"{target}"] + std * norm.ppf(0.975) 

75 pred_df[f"{target}__lower"] = pred_df[f"{target}"] + std * norm.ppf(0.025) 

76 else: 

77 pred_df = pred_df.join(err_df, how="left") 

78 pred_df[f"{target}__upper"] = pred_df[f"{target}"] + norm.ppf(0.975) * err_df[f"{target}_err"] 

79 pred_df[f"{target}__lower"] = pred_df[f"{target}"] + norm.ppf(0.025) * err_df[f"{target}_err"] 

80 pred_df.drop(columns=[f"{target}_err"], inplace=True) 

81 return pred_df 

82 

83 

84class DefaultForecasterAdapter(BaseMerlionForecastAdapter): 

85 # DefaultForecaster 

86 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.html# 

87 # merlion.models.defaults.DefaultForecaster 

88 def __init__(self, **kwargs): 

89 super(DefaultForecasterAdapter, self).__init__(**kwargs) 

90 self.model = DefaultForecaster(DefaultForecasterConfig(max_forecast_steps=self.max_forecast_steps, 

91 target_seq_index=self.TARGET_SEQ_INDEX)) 

92 

93 

94class SarimaForecasterAdapter(BaseMerlionForecastAdapter): 

95 # AutoSarima 

96 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.automl.html# 

97 # module-merlion.models.automl.autosarima 

98 def __init__(self, **kwargs): 

99 super(SarimaForecasterAdapter, self).__init__(**kwargs) 

100 config = AutoSarimaConfig(auto_pqPQ=True, auto_d=True, auto_D=True, auto_seasonality=True, 

101 approximation=True, maxiter=5) 

102 self.model = AutoSarima(config) 

103 

104 

105class ProphetForecasterAdapter(BaseMerlionForecastAdapter): 

106 # AutoProphet 

107 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.automl.html# 

108 # module-merlion.models.automl.autoprophet 

109 def __init__(self, **kwargs): 

110 super(ProphetForecasterAdapter, self).__init__(**kwargs) 

111 self.model = AutoProphet(AutoProphetConfig(max_forecast_steps=self.max_forecast_steps)) 

112 

113 

114class MSESForecasterAdapter(BaseMerlionForecastAdapter): 

115 # MSES 

116 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.forecast.html# 

117 # merlion.models.forecast.smoother.MSES 

118 def __init__(self, **kwargs): 

119 super(MSESForecasterAdapter, self).__init__(**kwargs) 

120 self.model = MSES(MSESConfig(max_forecast_steps=self.max_forecast_steps, 

121 target_seq_index=self.TARGET_SEQ_INDEX, 

122 max_backstep=self.max_backstep)) 

123 

124 

125class BaseMerlineDetectorAdapter(BaseMerlionForecastAdapter): 

126 DEFAULT_WIND_SZ = 60 

127 DEFAULT_ALM_THRESHOLD = 4 

128 

129 def __init__(self, **kwargs): 

130 super(BaseMerlineDetectorAdapter, self).__init__(**kwargs) 

131 self.wind_sz = kwargs.get(MerlionArguments.wind_sz.value, self.DEFAULT_WIND_SZ) 

132 self.alm_threshold = kwargs.get(MerlionArguments.alm_threshold.value, self.DEFAULT_ALM_THRESHOLD) 

133 

134 def train(self, df: pd.DataFrame, target: str): 

135 df = self.to_train_dataframe(df=df, target=target) 

136 train_data = TimeSeries.from_pd(df[target]) 

137 self.model.train(train_data=train_data) 

138 

139 def predict(self, df: pd.DataFrame, target: str) -> pd.DataFrame: 

140 predict_data = TimeSeries.from_pd(df[target]) 

141 predict_label = self.model.get_anomaly_label(time_series=predict_data) 

142 pred_df = predict_label.to_pd() 

143 pred_df[f"{target}__anomaly_score"] = pred_df["anom_score"] 

144 return pred_df 

145 

146 

147class DefaultDetectorAdapter(BaseMerlineDetectorAdapter): 

148 # DefaultDetector 

149 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.html# 

150 # merlion.models.defaults.DefaultDetector 

151 def __init__(self, **kwargs): 

152 super(DefaultDetectorAdapter, self).__init__(**kwargs) 

153 self.model = DefaultDetector(DefaultDetectorConfig()) 

154 

155 

156class IsolationForestDetectorAdapter(BaseMerlineDetectorAdapter): 

157 # IsolationForest 

158 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.anomaly.html# 

159 # merlion.models.anomaly.isolation_forest.IsolationForest 

160 def __init__(self, **kwargs): 

161 super(IsolationForestDetectorAdapter, self).__init__(**kwargs) 

162 self.model = IsolationForest(IsolationForestConfig()) 

163 

164 

165class WindStatsDetectorAdapter(BaseMerlineDetectorAdapter): 

166 # WindStats 

167 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.anomaly.html# 

168 # merlion.models.anomaly.windstats.WindStats 

169 def __init__(self, **kwargs): 

170 super(WindStatsDetectorAdapter, self).__init__(**kwargs) 

171 config = WindStatsConfig(wind_sz=self.wind_sz, threshold=AggregateAlarms(alm_threshold=self.alm_threshold)) 

172 self.model = WindStats(config) 

173 

174 

175class ProphetDetectorAdapter(BaseMerlineDetectorAdapter): 

176 # ProphetDetector 

177 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.anomaly.forecast_based.html# 

178 # merlion.models.anomaly.forecast_based.prophet.ProphetDetector 

179 def __init__(self, **kwargs): 

180 super(ProphetDetectorAdapter, self).__init__(**kwargs) 

181 config = ProphetDetectorConfig(transform=DifferenceTransform()) 

182 self.model = ProphetDetector(config)