Coverage for mindsdb / integrations / handlers / anomaly_detection_handler / anomaly_detection_handler.py: 0%
109 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from mindsdb.integrations.libs.base import BaseMLEngine
2import pandas as pd
3from mindsdb.integrations.handlers.anomaly_detection_handler.utils import (
4 train_unsupervised,
5 train_supervised,
6 train_semisupervised,
7)
8from joblib import dump, load
9from pyod.models.ecod import ECOD # unsupervised default
10from pyod.models.xgbod import XGBOD # semi-supervised default
11from catboost import CatBoostClassifier # supervised default
12from pyod.models.lof import LOF
13from pyod.models.knn import KNN
14from pyod.models.pca import PCA
15from xgboost import XGBClassifier
16from sklearn.naive_bayes import GaussianNB
17from sklearn.preprocessing import StandardScaler
18import numpy as np
19import os
22MODELS = {
23 "supervised": {
24 "catboost": CatBoostClassifier(logging_level="Silent"),
25 "xgb": XGBClassifier(),
26 "nb": GaussianNB(),
27 },
28 "semi-supervised": {
29 "xgbod": XGBOD(estimator_list=[ECOD()]),
30 },
31 "unsupervised": {
32 "ecod": ECOD(),
33 "knn": KNN(),
34 "pca": PCA(),
35 "lof": LOF(),
36 },
37}
40def choose_model_type(training_df, model_type=None, target=None, supervised_threshold=3000):
41 """Choose the model type based on the presence of labels and size of the dataset"""
42 if model_type is None:
43 if target is None:
44 model_type = "unsupervised"
45 else:
46 model_type = "supervised" if len(training_df) > supervised_threshold else "semi-supervised"
47 assert model_type in [
48 "supervised",
49 "semi-supervised",
50 "unsupervised",
51 ], "model type must be 'supervised', 'semi-supervised', or 'unsupervised'"
52 return model_type
55def choose_model(df, model_name=None, model_type=None, target=None, supervised_threshold=3000):
56 """Choose the best model based on the size of the dataset and the model type"""
57 training_df = preprocess_data(df)
58 model_type = choose_model_type(training_df, model_type, target, supervised_threshold)
59 if model_name is not None:
60 assert model_name in MODELS[model_type], f"model name must be one of {list(MODELS[model_type].keys())}"
61 model = MODELS[model_type][model_name]
62 else:
63 model = None
64 if model_type == "unsupervised":
65 return train_unsupervised(training_df, model=model)
66 X_train = training_df.drop(target, axis=1)
67 y_train = training_df[target].astype(int)
69 if model_type == "supervised":
70 return train_supervised(X_train, y_train, model=model)
71 elif model_type == "semi-supervised":
72 return train_semisupervised(X_train, y_train) # Only one semi-supervised model available
75def anomaly_type_to_model_name(anomaly_type):
76 """Choose the best model name based on the anomaly type"""
77 assert anomaly_type in [
78 "local",
79 "global",
80 "clustered",
81 "dependency",
82 ], "anomaly type must be 'local', 'global', 'clustered', or 'dependency'"
83 anomaly_type_dict = {
84 "local": "lof",
85 "global": "knn",
86 "clustered": "pca",
87 "dependency": "knn",
88 }
89 return anomaly_type_dict[anomaly_type]
92def preprocess_data(df):
93 """Preprocess the data by one-hot encoding categorical columns and scaling numeric columns"""
94 # one-hot encode categorical columns
95 categorical_columns = list(df.select_dtypes(include=["object"]).columns.values)
96 df[categorical_columns] = df[categorical_columns].astype("category")
97 df[categorical_columns] = df[categorical_columns].apply(lambda x: x.cat.codes)
98 df = pd.get_dummies(df, columns=categorical_columns)
99 numeric_columns = list(df.select_dtypes(include=["number"]).columns.values)
101 scaler = StandardScaler()
102 df[numeric_columns] = scaler.fit_transform(df[numeric_columns])
103 return df
106def get_model_names(using_args):
107 """Get the model names from the using_args. Model names is a list of model names to train.
108 If the model is not an ensemble, it only contains one model"""
109 model_names = anomaly_type_to_model_name(using_args["anomaly_type"]) if "anomaly_type" in using_args else None
110 model_names = using_args["model_name"] if "model_name" in using_args else model_names
111 model_names = using_args["ensemble_models"] if "ensemble_models" in using_args else model_names
112 model_names = [model_names] if model_names is None else model_names
113 model_names = [model_names] if type(model_names) is str else model_names
114 return model_names
117class AnomalyDetectionHandler(BaseMLEngine):
118 """Integration with the PyOD and CatBoost libraries for
119 anomaly detection. Both supervised and unsupervised.
120 """
122 name = "anomaly_detection"
124 def __init__(self, *args, **kwargs):
125 super().__init__(*args, **kwargs)
126 self.generative = True # makes unsupervised learning work
128 def create(self, target, df, args=None):
129 """Train a model and save it to the model storage"""
130 args = {} if args is None else args
131 using_args = args["using"]
132 model_type = using_args["type"] if "type" in using_args else None
134 model_names = get_model_names(using_args)
136 model_save_paths = []
137 model_targets = []
138 model_class_names = []
139 base_path = self.model_storage.folder_get("context")
140 for model_name in model_names:
141 model = choose_model(df, model_name=model_name, model_type=model_type, target=target)
142 this_model_target = "outlier" if target is None else target # output column name for unsupervised learning
143 save_path = "model.joblib" if model_name is None else model_name + ".joblib"
144 dump(model, os.path.join(base_path, save_path))
145 model_save_paths.append(save_path)
146 model_targets.append(this_model_target)
147 model_class_names.append(model.__class__.__name__)
148 model_args = {"model_path": model_save_paths, "target": model_targets, "model_name": model_class_names}
149 self.model_storage.json_set("model_args", model_args)
150 self.model_storage.folder_sync("context")
152 def predict(self, df, args=None):
153 """Load a model from the model storage and use it to make predictions"""
154 args = {} if args is None else args
155 model_args = self.model_storage.json_get("model_args")
156 results_list = []
157 if "__mindsdb_row_id" in df.columns:
158 df = df.drop("__mindsdb_row_id", axis=1)
159 if model_args["target"][0] in df.columns:
160 df = df.drop(model_args["target"], axis=1)
161 base_path = self.model_storage.folder_get("context")
162 for model_path in model_args["model_path"]:
163 model = load(os.path.join(base_path, model_path))
164 predict_df = preprocess_data(df).astype(float)
165 results = model.predict(predict_df)
166 results_list.append(results)
167 final_results = np.array(results_list).mean(axis=0)
168 final_results = np.where(final_results > 0.5, 1, 0)
169 return pd.DataFrame({model_args["target"][0]: final_results})
171 def describe(self, attribute="model"):
172 model_args = self.model_storage.json_get("model_args")
173 df = pd.DataFrame({"model_name": [], "target": []})
174 if attribute == "model":
175 for model_name, target in zip(model_args["model_name"], model_args["target"]):
176 df2 = pd.DataFrame({"model_name": model_name, "target": target}, index=[0])
177 df = pd.concat([df, df2], ignore_index=True)
178 return df
179 else:
180 raise NotImplementedError(f"attribute {attribute} not implemented")