Coverage for mindsdb / integrations / handlers / merlion_handler / adapters.py: 0%
117 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from merlion.models.anomaly.forecast_based.prophet import ProphetDetectorConfig, ProphetDetector
2from merlion.models.anomaly.isolation_forest import IsolationForestConfig, IsolationForest
3from merlion.models.anomaly.windstats import WindStatsConfig, WindStats
4from merlion.models.automl.autoprophet import AutoProphetConfig, AutoProphet
5from merlion.models.automl.autosarima import AutoSarimaConfig, AutoSarima
6from merlion.models.forecast.smoother import MSESConfig, MSES
7from merlion.post_process.threshold import AggregateAlarms
8from merlion.transform.moving_average import DifferenceTransform
9from scipy.stats import norm
11from merlion.models.defaults import DefaultDetectorConfig, DefaultDetector, DefaultForecaster, DefaultForecasterConfig
12from merlion.utils import TimeSeries
14from enum import Enum
15import pandas as pd
18class MerlionArguments(Enum):
19 target_seq_index = "target_seq_index"
20 max_forecast_steps = "max_forecast_steps"
21 max_backstep = "max_backstep"
22 wind_sz = "wind_sz"
23 alm_threshold = "alm_threshold"
24 maxiter = "maxiter"
27class BaseMerlionForecastAdapter:
28 TARGET_SEQ_INDEX = 0
29 DEFAULT_MAX_FORECAST_STEPS = 100
30 DEFAULT_MAX_BACKSTEP = 60
31 DEFAULT_MAXITER = 5
33 def __init__(self, **kwargs):
34 self.max_forecast_steps = kwargs.get(MerlionArguments.max_forecast_steps.value, self.DEFAULT_MAX_FORECAST_STEPS)
35 self.max_backstep = kwargs.get(MerlionArguments.max_backstep.value, self.DEFAULT_MAX_BACKSTEP)
36 self.maxiter = kwargs.get(MerlionArguments.maxiter.value, self.DEFAULT_MAXITER)
37 self.model = None
39 def to_bytes(self) -> bytes:
40 return self.model.to_bytes()
42 def initialize_model(self, bytes):
43 self.model = self.model.from_bytes(bytes)
45 def to_train_dataframe(self, df: pd.DataFrame, target: str) -> pd.DataFrame:
46 columns = list(df.columns.values)
47 columns.remove(target)
48 columns.insert(self.TARGET_SEQ_INDEX, target)
49 return df[columns]
51 def train(self, df: pd.DataFrame, target: str):
52 df = self.to_train_dataframe(df=df, target=target)
53 train_data = TimeSeries.from_pd(df)
54 self.model.train(train_data=train_data)
56 def predict(self, df: pd.DataFrame, target: str) -> pd.DataFrame:
57 forecast_step = self.max_forecast_steps
58 df = df[df.index <= self.model.last_train_time + self.model.timedelta * forecast_step]
59 if len(list(df.columns.values)) == 0:
60 df.loc[:, target] = 0
61 predict_data = TimeSeries.from_pd(df)
62 predict_pred, predict_err = self.model.forecast(time_stamps=predict_data.time_stamps)
63 return self.__prepare_forecast_return(target=target, pred_ts=predict_pred, err_ts=predict_err)
65 def __prepare_forecast_return(self, target: str, pred_ts: TimeSeries, err_ts: TimeSeries) -> pd.DataFrame:
66 pred_df: pd.DataFrame = pred_ts.to_pd()
67 if err_ts is None:
68 err_df = None
69 else:
70 err_df = err_ts.to_pd()
72 if err_df is None or target in list(err_df.columns.values): # error and predict sometimes are same
73 std = pred_df[target].std()
74 pred_df[f"{target}__upper"] = pred_df[f"{target}"] + std * norm.ppf(0.975)
75 pred_df[f"{target}__lower"] = pred_df[f"{target}"] + std * norm.ppf(0.025)
76 else:
77 pred_df = pred_df.join(err_df, how="left")
78 pred_df[f"{target}__upper"] = pred_df[f"{target}"] + norm.ppf(0.975) * err_df[f"{target}_err"]
79 pred_df[f"{target}__lower"] = pred_df[f"{target}"] + norm.ppf(0.025) * err_df[f"{target}_err"]
80 pred_df.drop(columns=[f"{target}_err"], inplace=True)
81 return pred_df
84class DefaultForecasterAdapter(BaseMerlionForecastAdapter):
85 # DefaultForecaster
86 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.html#
87 # merlion.models.defaults.DefaultForecaster
88 def __init__(self, **kwargs):
89 super(DefaultForecasterAdapter, self).__init__(**kwargs)
90 self.model = DefaultForecaster(DefaultForecasterConfig(max_forecast_steps=self.max_forecast_steps,
91 target_seq_index=self.TARGET_SEQ_INDEX))
94class SarimaForecasterAdapter(BaseMerlionForecastAdapter):
95 # AutoSarima
96 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.automl.html#
97 # module-merlion.models.automl.autosarima
98 def __init__(self, **kwargs):
99 super(SarimaForecasterAdapter, self).__init__(**kwargs)
100 config = AutoSarimaConfig(auto_pqPQ=True, auto_d=True, auto_D=True, auto_seasonality=True,
101 approximation=True, maxiter=5)
102 self.model = AutoSarima(config)
105class ProphetForecasterAdapter(BaseMerlionForecastAdapter):
106 # AutoProphet
107 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.automl.html#
108 # module-merlion.models.automl.autoprophet
109 def __init__(self, **kwargs):
110 super(ProphetForecasterAdapter, self).__init__(**kwargs)
111 self.model = AutoProphet(AutoProphetConfig(max_forecast_steps=self.max_forecast_steps))
114class MSESForecasterAdapter(BaseMerlionForecastAdapter):
115 # MSES
116 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.forecast.html#
117 # merlion.models.forecast.smoother.MSES
118 def __init__(self, **kwargs):
119 super(MSESForecasterAdapter, self).__init__(**kwargs)
120 self.model = MSES(MSESConfig(max_forecast_steps=self.max_forecast_steps,
121 target_seq_index=self.TARGET_SEQ_INDEX,
122 max_backstep=self.max_backstep))
125class BaseMerlineDetectorAdapter(BaseMerlionForecastAdapter):
126 DEFAULT_WIND_SZ = 60
127 DEFAULT_ALM_THRESHOLD = 4
129 def __init__(self, **kwargs):
130 super(BaseMerlineDetectorAdapter, self).__init__(**kwargs)
131 self.wind_sz = kwargs.get(MerlionArguments.wind_sz.value, self.DEFAULT_WIND_SZ)
132 self.alm_threshold = kwargs.get(MerlionArguments.alm_threshold.value, self.DEFAULT_ALM_THRESHOLD)
134 def train(self, df: pd.DataFrame, target: str):
135 df = self.to_train_dataframe(df=df, target=target)
136 train_data = TimeSeries.from_pd(df[target])
137 self.model.train(train_data=train_data)
139 def predict(self, df: pd.DataFrame, target: str) -> pd.DataFrame:
140 predict_data = TimeSeries.from_pd(df[target])
141 predict_label = self.model.get_anomaly_label(time_series=predict_data)
142 pred_df = predict_label.to_pd()
143 pred_df[f"{target}__anomaly_score"] = pred_df["anom_score"]
144 return pred_df
147class DefaultDetectorAdapter(BaseMerlineDetectorAdapter):
148 # DefaultDetector
149 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.html#
150 # merlion.models.defaults.DefaultDetector
151 def __init__(self, **kwargs):
152 super(DefaultDetectorAdapter, self).__init__(**kwargs)
153 self.model = DefaultDetector(DefaultDetectorConfig())
156class IsolationForestDetectorAdapter(BaseMerlineDetectorAdapter):
157 # IsolationForest
158 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.anomaly.html#
159 # merlion.models.anomaly.isolation_forest.IsolationForest
160 def __init__(self, **kwargs):
161 super(IsolationForestDetectorAdapter, self).__init__(**kwargs)
162 self.model = IsolationForest(IsolationForestConfig())
165class WindStatsDetectorAdapter(BaseMerlineDetectorAdapter):
166 # WindStats
167 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.anomaly.html#
168 # merlion.models.anomaly.windstats.WindStats
169 def __init__(self, **kwargs):
170 super(WindStatsDetectorAdapter, self).__init__(**kwargs)
171 config = WindStatsConfig(wind_sz=self.wind_sz, threshold=AggregateAlarms(alm_threshold=self.alm_threshold))
172 self.model = WindStats(config)
175class ProphetDetectorAdapter(BaseMerlineDetectorAdapter):
176 # ProphetDetector
177 # reference: https://opensource.salesforce.com/Merlion/latest/merlion.models.anomaly.forecast_based.html#
178 # merlion.models.anomaly.forecast_based.prophet.ProphetDetector
179 def __init__(self, **kwargs):
180 super(ProphetDetectorAdapter, self).__init__(**kwargs)
181 config = ProphetDetectorConfig(transform=DifferenceTransform())
182 self.model = ProphetDetector(config)