Coverage for mindsdb / integrations / handlers / anomaly_detection_handler / anomaly_detection_handler.py: 0%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb.integrations.libs.base import BaseMLEngine 

2import pandas as pd 

3from mindsdb.integrations.handlers.anomaly_detection_handler.utils import ( 

4 train_unsupervised, 

5 train_supervised, 

6 train_semisupervised, 

7) 

8from joblib import dump, load 

9from pyod.models.ecod import ECOD # unsupervised default 

10from pyod.models.xgbod import XGBOD # semi-supervised default 

11from catboost import CatBoostClassifier # supervised default 

12from pyod.models.lof import LOF 

13from pyod.models.knn import KNN 

14from pyod.models.pca import PCA 

15from xgboost import XGBClassifier 

16from sklearn.naive_bayes import GaussianNB 

17from sklearn.preprocessing import StandardScaler 

18import numpy as np 

19import os 

20 

21 

22MODELS = { 

23 "supervised": { 

24 "catboost": CatBoostClassifier(logging_level="Silent"), 

25 "xgb": XGBClassifier(), 

26 "nb": GaussianNB(), 

27 }, 

28 "semi-supervised": { 

29 "xgbod": XGBOD(estimator_list=[ECOD()]), 

30 }, 

31 "unsupervised": { 

32 "ecod": ECOD(), 

33 "knn": KNN(), 

34 "pca": PCA(), 

35 "lof": LOF(), 

36 }, 

37} 

38 

39 

40def choose_model_type(training_df, model_type=None, target=None, supervised_threshold=3000): 

41 """Choose the model type based on the presence of labels and size of the dataset""" 

42 if model_type is None: 

43 if target is None: 

44 model_type = "unsupervised" 

45 else: 

46 model_type = "supervised" if len(training_df) > supervised_threshold else "semi-supervised" 

47 assert model_type in [ 

48 "supervised", 

49 "semi-supervised", 

50 "unsupervised", 

51 ], "model type must be 'supervised', 'semi-supervised', or 'unsupervised'" 

52 return model_type 

53 

54 

55def choose_model(df, model_name=None, model_type=None, target=None, supervised_threshold=3000): 

56 """Choose the best model based on the size of the dataset and the model type""" 

57 training_df = preprocess_data(df) 

58 model_type = choose_model_type(training_df, model_type, target, supervised_threshold) 

59 if model_name is not None: 

60 assert model_name in MODELS[model_type], f"model name must be one of {list(MODELS[model_type].keys())}" 

61 model = MODELS[model_type][model_name] 

62 else: 

63 model = None 

64 if model_type == "unsupervised": 

65 return train_unsupervised(training_df, model=model) 

66 X_train = training_df.drop(target, axis=1) 

67 y_train = training_df[target].astype(int) 

68 

69 if model_type == "supervised": 

70 return train_supervised(X_train, y_train, model=model) 

71 elif model_type == "semi-supervised": 

72 return train_semisupervised(X_train, y_train) # Only one semi-supervised model available 

73 

74 

75def anomaly_type_to_model_name(anomaly_type): 

76 """Choose the best model name based on the anomaly type""" 

77 assert anomaly_type in [ 

78 "local", 

79 "global", 

80 "clustered", 

81 "dependency", 

82 ], "anomaly type must be 'local', 'global', 'clustered', or 'dependency'" 

83 anomaly_type_dict = { 

84 "local": "lof", 

85 "global": "knn", 

86 "clustered": "pca", 

87 "dependency": "knn", 

88 } 

89 return anomaly_type_dict[anomaly_type] 

90 

91 

92def preprocess_data(df): 

93 """Preprocess the data by one-hot encoding categorical columns and scaling numeric columns""" 

94 # one-hot encode categorical columns 

95 categorical_columns = list(df.select_dtypes(include=["object"]).columns.values) 

96 df[categorical_columns] = df[categorical_columns].astype("category") 

97 df[categorical_columns] = df[categorical_columns].apply(lambda x: x.cat.codes) 

98 df = pd.get_dummies(df, columns=categorical_columns) 

99 numeric_columns = list(df.select_dtypes(include=["number"]).columns.values) 

100 

101 scaler = StandardScaler() 

102 df[numeric_columns] = scaler.fit_transform(df[numeric_columns]) 

103 return df 

104 

105 

106def get_model_names(using_args): 

107 """Get the model names from the using_args. Model names is a list of model names to train. 

108 If the model is not an ensemble, it only contains one model""" 

109 model_names = anomaly_type_to_model_name(using_args["anomaly_type"]) if "anomaly_type" in using_args else None 

110 model_names = using_args["model_name"] if "model_name" in using_args else model_names 

111 model_names = using_args["ensemble_models"] if "ensemble_models" in using_args else model_names 

112 model_names = [model_names] if model_names is None else model_names 

113 model_names = [model_names] if type(model_names) is str else model_names 

114 return model_names 

115 

116 

117class AnomalyDetectionHandler(BaseMLEngine): 

118 """Integration with the PyOD and CatBoost libraries for 

119 anomaly detection. Both supervised and unsupervised. 

120 """ 

121 

122 name = "anomaly_detection" 

123 

124 def __init__(self, *args, **kwargs): 

125 super().__init__(*args, **kwargs) 

126 self.generative = True # makes unsupervised learning work 

127 

128 def create(self, target, df, args=None): 

129 """Train a model and save it to the model storage""" 

130 args = {} if args is None else args 

131 using_args = args["using"] 

132 model_type = using_args["type"] if "type" in using_args else None 

133 

134 model_names = get_model_names(using_args) 

135 

136 model_save_paths = [] 

137 model_targets = [] 

138 model_class_names = [] 

139 base_path = self.model_storage.folder_get("context") 

140 for model_name in model_names: 

141 model = choose_model(df, model_name=model_name, model_type=model_type, target=target) 

142 this_model_target = "outlier" if target is None else target # output column name for unsupervised learning 

143 save_path = "model.joblib" if model_name is None else model_name + ".joblib" 

144 dump(model, os.path.join(base_path, save_path)) 

145 model_save_paths.append(save_path) 

146 model_targets.append(this_model_target) 

147 model_class_names.append(model.__class__.__name__) 

148 model_args = {"model_path": model_save_paths, "target": model_targets, "model_name": model_class_names} 

149 self.model_storage.json_set("model_args", model_args) 

150 self.model_storage.folder_sync("context") 

151 

152 def predict(self, df, args=None): 

153 """Load a model from the model storage and use it to make predictions""" 

154 args = {} if args is None else args 

155 model_args = self.model_storage.json_get("model_args") 

156 results_list = [] 

157 if "__mindsdb_row_id" in df.columns: 

158 df = df.drop("__mindsdb_row_id", axis=1) 

159 if model_args["target"][0] in df.columns: 

160 df = df.drop(model_args["target"], axis=1) 

161 base_path = self.model_storage.folder_get("context") 

162 for model_path in model_args["model_path"]: 

163 model = load(os.path.join(base_path, model_path)) 

164 predict_df = preprocess_data(df).astype(float) 

165 results = model.predict(predict_df) 

166 results_list.append(results) 

167 final_results = np.array(results_list).mean(axis=0) 

168 final_results = np.where(final_results > 0.5, 1, 0) 

169 return pd.DataFrame({model_args["target"][0]: final_results}) 

170 

171 def describe(self, attribute="model"): 

172 model_args = self.model_storage.json_get("model_args") 

173 df = pd.DataFrame({"model_name": [], "target": []}) 

174 if attribute == "model": 

175 for model_name, target in zip(model_args["model_name"], model_args["target"]): 

176 df2 = pd.DataFrame({"model_name": model_name, "target": target}, index=[0]) 

177 df = pd.concat([df, df2], ignore_index=True) 

178 return df 

179 else: 

180 raise NotImplementedError(f"attribute {attribute} not implemented")