Coverage for mindsdb / interfaces / database / integrations.py: 69%
488 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os
2import base64
3import shutil
4import ast
5import time
6import tempfile
7import importlib
8import threading
9from pathlib import Path
10from copy import deepcopy
11from typing import Optional
12from textwrap import dedent
13from collections import OrderedDict
14import inspect
16from sqlalchemy import func
18from mindsdb.interfaces.storage import db
19from mindsdb.utilities.config import Config
20from mindsdb.utilities.exception import EntityNotExistsError
21from mindsdb.interfaces.storage.fs import FsStore, FileStorage, RESOURCE_GROUP
22from mindsdb.interfaces.storage.model_fs import HandlerStorage
23from mindsdb.interfaces.file.file_controller import FileController
24from mindsdb.integrations.libs.base import DatabaseHandler
25from mindsdb.integrations.libs.base import BaseMLEngine
26from mindsdb.integrations.libs.api_handler import APIHandler
27from mindsdb.integrations.libs.const import HANDLER_CONNECTION_ARG_TYPE as ARG_TYPE, HANDLER_TYPE
28from mindsdb.interfaces.model.functions import get_model_records
29from mindsdb.utilities.context import context as ctx
30from mindsdb.utilities import log
31from mindsdb.integrations.libs.ml_exec_base import BaseMLEngineExec
32from mindsdb.integrations.libs.base import BaseHandler
33import mindsdb.utilities.profiler as profiler
34from mindsdb.interfaces.database.data_handlers_cache import HandlersCache
36logger = log.getLogger(__name__)
39class IntegrationController:
40 @staticmethod
41 def _is_not_empty_str(s):
42 return isinstance(s, str) and len(s) > 0
44 def __init__(self):
45 self._import_lock = threading.Lock()
46 self._load_handler_modules()
47 self.handlers_cache = HandlersCache()
49 def _add_integration_record(self, name, engine, connection_args):
50 integration_record = db.Integration(
51 name=name, engine=engine, data=connection_args or {}, company_id=ctx.company_id
52 )
53 db.session.add(integration_record)
54 db.session.commit()
55 return integration_record.id
57 def add(self, name: str, engine, connection_args):
58 logger.debug(
59 "%s: add method calling name=%s, engine=%s, connection_args=%s, company_id=%s",
60 self.__class__.__name__,
61 name,
62 engine,
63 connection_args,
64 ctx.company_id,
65 )
66 handler_meta = self.get_handler_meta(engine)
68 accept_connection_args = handler_meta.get("connection_args")
69 logger.debug("%s: accept_connection_args - %s", self.__class__.__name__, accept_connection_args)
71 files_dir = None
72 if accept_connection_args is not None and connection_args is not None:
73 for arg_name, arg_value in connection_args.items():
74 if arg_name in accept_connection_args and accept_connection_args[arg_name]["type"] == ARG_TYPE.PATH: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true
75 if files_dir is None:
76 files_dir = tempfile.mkdtemp(prefix="mindsdb_files_")
77 shutil.copy(arg_value, files_dir)
78 connection_args[arg_name] = Path(arg_value).name
80 integration_id = self._add_integration_record(name, engine, connection_args)
82 if files_dir is not None: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true
83 store = FileStorage(resource_group=RESOURCE_GROUP.INTEGRATION, resource_id=integration_id, sync=False)
84 store.add(files_dir, "")
85 store.push()
87 if handler_meta.get("type") == HANDLER_TYPE.ML:
88 ml_handler = self.get_ml_handler(name)
89 ml_handler.create_engine(connection_args, integration_id)
91 return integration_id
93 def modify(self, name, data, check_connection=False):
94 self.handlers_cache.delete(name)
95 integration_record = self._get_integration_record(name)
96 if isinstance(integration_record.data, dict) and integration_record.data.get("is_demo") is True: 96 ↛ 97line 96 didn't jump to line 97 because the condition on line 96 was never true
97 raise ValueError("It is forbidden to change properties of the demo object")
98 old_data = deepcopy(integration_record.data)
99 for k in old_data: 99 ↛ 100line 99 didn't jump to line 100 because the loop on line 99 never started
100 if k not in data:
101 data[k] = old_data[k]
103 # Test the new connection data before applying
104 if check_connection: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true
105 try:
106 temp_name = f"{integration_record.name}_update_{time.time()}".replace(".", "")
107 handler = self.create_tmp_handler(temp_name, integration_record.engine, data)
108 status = handler.check_connection()
109 except ImportError:
110 raise
112 if status.success is not True:
113 raise Exception(f"Connection test failed: {status.error_message}")
115 integration_record.data = data
116 db.session.commit()
118 def delete(self, name: str, strict_case: bool = False) -> None:
119 """Delete an integration by name.
121 Args:
122 name (str): The name of the integration to delete.
123 strict_case (bool, optional): If True, the integration name is case-sensitive. Defaults to False.
125 Raises:
126 Exception: If the integration cannot be deleted (system, permanent, demo, in use, or has active models).
128 Returns:
129 None
130 """
131 if name == "files": 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true
132 raise Exception("Unable to drop: is system database")
134 self.handlers_cache.delete(name)
136 # check permanent integration
137 if name.lower() in self.handler_modules: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 handler = self.handler_modules[name]
140 if getattr(handler, "permanent", False) is True:
141 raise Exception("Unable to drop permanent integration")
143 integration_record = self._get_integration_record(name, case_sensitive=strict_case)
144 if isinstance(integration_record.data, dict) and integration_record.data.get("is_demo") is True: 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true
145 raise Exception("Unable to drop demo object")
147 # if this is ml engine
148 engine_models = get_model_records(ml_handler_name=name, deleted_at=None)
149 active_models = [m.name for m in engine_models if m.deleted_at is None]
150 if len(active_models) > 0: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true
151 raise Exception(f"Unable to drop ml engine with active models: {active_models}")
153 # check linked KBs
154 kb = db.KnowledgeBase.query.filter_by(vector_database_id=integration_record.id).first()
155 if kb is not None: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 raise Exception(f"Unable to drop, integration is used by knowledge base: {kb.name}")
158 # check linked predictors
159 models = get_model_records()
160 for model in models: 160 ↛ 161line 160 didn't jump to line 161 because the loop on line 160 never started
161 if (
162 model.data_integration_ref is not None
163 and model.data_integration_ref.get("type") == "integration"
164 and isinstance(model.data_integration_ref.get("id"), int)
165 and model.data_integration_ref["id"] == integration_record.id
166 ):
167 model.data_integration_ref = None
169 # unlink deleted models
170 for model in engine_models: 170 ↛ 171line 170 didn't jump to line 171 because the loop on line 170 never started
171 if model.deleted_at is not None:
172 model.integration_id = None
174 db.session.delete(integration_record)
175 db.session.commit()
177 def _get_integration_record_data(self, integration_record, show_secrets=True):
178 if ( 178 ↛ 183line 178 didn't jump to line 183 because the condition on line 178 was never true
179 integration_record is None
180 or integration_record.data is None
181 or isinstance(integration_record.data, dict) is False
182 ):
183 return None
184 data = deepcopy(integration_record.data)
186 bundle_path = data.get("secure_connect_bundle")
187 mysql_ssl_ca = data.get("ssl_ca")
188 mysql_ssl_cert = data.get("ssl_cert")
189 mysql_ssl_key = data.get("ssl_key")
190 if ( 190 ↛ 200line 190 didn't jump to line 200 because the condition on line 190 was never true
191 data.get("type") in ("mysql", "mariadb")
192 and (
193 self._is_not_empty_str(mysql_ssl_ca)
194 or self._is_not_empty_str(mysql_ssl_cert)
195 or self._is_not_empty_str(mysql_ssl_key)
196 )
197 or data.get("type") in ("cassandra", "scylla")
198 and bundle_path is not None
199 ):
200 fs_store = FsStore()
201 integrations_dir = Config()["paths"]["integrations"]
202 folder_name = f"integration_files_{integration_record.company_id}_{integration_record.id}"
203 fs_store.get(folder_name, base_dir=integrations_dir)
205 handler_meta = self.get_handler_metadata(integration_record.engine)
206 integration_type = None
207 if isinstance(handler_meta, dict): 207 ↛ 211line 207 didn't jump to line 211 because the condition on line 207 was always true
208 # in other cases, the handler directory is likely not exist.
209 integration_type = handler_meta.get("type")
211 if show_secrets is False and handler_meta is not None:
212 connection_args = handler_meta.get("connection_args", None)
213 if isinstance(connection_args, dict): 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true
214 if integration_type == HANDLER_TYPE.DATA:
215 for key, value in connection_args.items():
216 if key in data and value.get("secret", False) is True:
217 data[key] = "******"
218 elif integration_type == HANDLER_TYPE.ML:
219 creation_args = connection_args.get("creation_args")
220 if isinstance(creation_args, dict):
221 for key, value in creation_args.items():
222 if key in data and value.get("secret", False) is True:
223 data[key] = "******"
224 else:
225 raise ValueError(f"Unexpected handler type: {integration_type}")
226 else:
227 # region obsolete, del in future
228 if "password" in data: 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true
229 data["password"] = None
230 if ( 230 ↛ 235line 230 didn't jump to line 235 because the condition on line 230 was never true
231 data.get("type") == "redis"
232 and isinstance(data.get("connection"), dict)
233 and "password" in data["connection"]
234 ):
235 data["connection"] = None
236 # endregion
238 class_type, permanent = None, False
239 if handler_meta is not None: 239 ↛ 243line 239 didn't jump to line 243 because the condition on line 239 was always true
240 class_type = handler_meta.get("class_type")
241 permanent = handler_meta.get("permanent", False)
243 return {
244 "id": integration_record.id,
245 "name": integration_record.name,
246 "type": integration_type,
247 "class_type": class_type,
248 "engine": integration_record.engine,
249 "permanent": permanent,
250 "date_last_update": deepcopy(integration_record.updated_at), # to del ?
251 "connection_data": data,
252 }
254 def get_by_id(self, integration_id, show_secrets=True):
255 integration_record = (
256 db.session.query(db.Integration).filter_by(company_id=ctx.company_id, id=integration_id).first()
257 )
258 return self._get_integration_record_data(integration_record, show_secrets)
260 def get(self, name, show_secrets=True, case_sensitive=False):
261 try:
262 integration_record = self._get_integration_record(name, case_sensitive)
263 except EntityNotExistsError:
264 return None
265 return self._get_integration_record_data(integration_record, show_secrets)
267 @staticmethod
268 def _get_integration_record(name: str, case_sensitive: bool = False) -> db.Integration:
269 """Get integration record by name
271 Args:
272 name (str): name of the integration
273 case_sensitive (bool): should search be case sensitive or not
275 Retruns:
276 db.Integration
277 """
278 if case_sensitive: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true
279 integration_records = db.session.query(db.Integration).filter_by(company_id=ctx.company_id, name=name).all()
280 if len(integration_records) > 1:
281 raise Exception(f"There is {len(integration_records)} integrations with name '{name}'")
282 if len(integration_records) == 0:
283 raise EntityNotExistsError(f"There is no integration with name '{name}'")
284 integration_record = integration_records[0]
285 else:
286 integration_record = (
287 db.session.query(db.Integration)
288 .filter(
289 (db.Integration.company_id == ctx.company_id)
290 & (func.lower(db.Integration.name) == func.lower(name))
291 )
292 .first()
293 )
294 if integration_record is None:
295 raise EntityNotExistsError(f"There is no integration with name '{name}'")
297 return integration_record
299 def get_all(self, show_secrets=True):
300 integration_records = db.session.query(db.Integration).filter_by(company_id=ctx.company_id).all()
301 integration_dict = {}
302 for record in integration_records:
303 if record is None or record.data is None: 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true
304 continue
305 integration_dict[record.name] = self._get_integration_record_data(record, show_secrets)
306 return integration_dict
308 def _make_handler_args(
309 self,
310 name: str,
311 handler_type: str,
312 connection_data: dict,
313 integration_id: int = None,
314 file_storage: FileStorage = None,
315 handler_storage: HandlerStorage = None,
316 ):
317 handler_args = dict(
318 name=name,
319 integration_id=integration_id,
320 connection_data=connection_data,
321 file_storage=file_storage,
322 handler_storage=handler_storage,
323 )
325 if handler_type == "files": 325 ↛ 326line 325 didn't jump to line 326 because the condition on line 325 was never true
326 handler_args["file_controller"] = FileController()
327 elif self.handler_modules.get(handler_type, False).type == HANDLER_TYPE.ML: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true
328 handler_args["handler_controller"] = self
329 handler_args["company_id"] = ctx.company_id
331 return handler_args
333 def create_tmp_handler(self, name: str, engine: str, connection_args: dict) -> dict:
334 """Create temporary handler, mostly for testing connections
336 Args:
337 name (str): Integration name
338 engine (str): Integration engine name
339 connection_args (dict): Connection arguments
341 Returns:
342 HandlerClass: Handler class instance
343 """
344 integration_id = int(time.time() * 10000)
346 file_storage = FileStorage(
347 resource_group=RESOURCE_GROUP.INTEGRATION, resource_id=integration_id, root_dir="tmp", sync=False
348 )
349 handler_storage = HandlerStorage(integration_id, root_dir="tmp", is_temporal=True)
351 handler_meta = self.get_handler_meta(engine)
352 if handler_meta is None: 352 ↛ 353line 352 didn't jump to line 353 because the condition on line 352 was never true
353 raise ImportError(f"Handler '{engine}' does not exist")
354 if handler_meta["import"]["success"] is False: 354 ↛ 355line 354 didn't jump to line 355 because the condition on line 354 was never true
355 raise ImportError(f"Handler '{engine}' cannot be imported: {handler_meta['import']['error_message']}")
356 HandlerClass = self.handler_modules[engine].Handler
358 handler_args = self._make_handler_args(
359 name=name,
360 handler_type=engine,
361 connection_data=connection_args,
362 integration_id=integration_id,
363 file_storage=file_storage,
364 handler_storage=handler_storage,
365 )
366 handler = HandlerClass(**handler_args)
367 return handler
369 def copy_integration_storage(self, integration_id_from, integration_id_to):
370 storage_from = HandlerStorage(integration_id_from)
371 root_path = ""
373 if storage_from.is_empty():
374 return None
375 folder_from = storage_from.folder_get(root_path)
377 storage_to = HandlerStorage(integration_id_to)
378 folder_to = storage_to.folder_get(root_path)
380 shutil.copytree(folder_from, folder_to, dirs_exist_ok=True)
381 storage_to.folder_sync(root_path)
383 def get_ml_handler(self, name: str, case_sensitive: bool = False) -> BaseMLEngine:
384 """Get ML handler by name
385 Args:
386 name (str): name of the handler
387 case_sensitive (bool): should case be taken into account when searching by name
389 Returns:
390 BaseMLEngine
391 """
392 integration_record = self._get_integration_record(name, case_sensitive)
393 integration_engine = integration_record.engine
395 integration_meta = self.get_handler_meta(integration_engine)
396 if integration_meta is None: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true
397 raise Exception(f"Handler '{name}' does not exists")
399 if integration_meta.get("type") != HANDLER_TYPE.ML: 399 ↛ 400line 399 didn't jump to line 400 because the condition on line 399 was never true
400 raise Exception(f"Handler '{name}' must be ML type")
402 logger.info(
403 f"{self.__class__.__name__}.get_handler: create a ML client "
404 + f"{integration_record.name}/{integration_record.id}"
405 )
406 handler = BaseMLEngineExec(
407 name=integration_record.name,
408 integration_id=integration_record.id,
409 handler_module=self.handler_modules[integration_engine],
410 )
412 return handler
414 @profiler.profile()
415 def get_data_handler(self, name: str, case_sensitive: bool = False, connect=True) -> BaseHandler:
416 """Get DATA handler (DB or API) by name
418 Args:
419 name (str): name of the handler
420 case_sensitive (bool): should case be taken into account when searching by name
422 Returns:
423 BaseHandler: data handler
424 """
425 handler = self.handlers_cache.get(name)
426 if handler is not None:
427 ctx.used_handlers.add(getattr(handler.__class__, "name", handler.__class__.__name__))
428 return handler
430 integration_record = self._get_integration_record(name, case_sensitive)
431 integration_engine = integration_record.engine
433 integration_meta = self.get_handler_meta(integration_engine)
435 if integration_meta is None: 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true
436 raise Exception(f"Handler '{name}' does not exist")
438 if integration_meta.get("type") != HANDLER_TYPE.DATA: 438 ↛ 439line 438 didn't jump to line 439 because the condition on line 438 was never true
439 raise Exception(f"Handler '{name}' must be DATA type")
441 integration_data = self._get_integration_record_data(integration_record, True)
442 if integration_data is None: 442 ↛ 443line 442 didn't jump to line 443 because the condition on line 442 was never true
443 raise Exception(f"Can't find integration_record for handler '{name}'")
444 connection_data = integration_data.get("connection_data", {})
445 logger.debug(
446 "%s.get_handler: connection_data=%s, engine=%s",
447 self.__class__.__name__,
448 connection_data,
449 integration_engine,
450 )
452 if integration_meta["import"]["success"] is False: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true
453 msg = dedent(f"""\
454 Handler '{integration_engine}' cannot be used. Reason is:
455 {integration_meta["import"]["error_message"]}
456 """)
457 is_cloud = Config().get("cloud", False)
458 if is_cloud is False:
459 msg += dedent(f"""
461 If error is related to missing dependencies, then try to run command in shell and restart mindsdb:
462 pip install mindsdb[{integration_engine}]
463 """)
464 logger.debug(msg)
465 raise Exception(msg)
467 connection_args = integration_meta.get("connection_args")
468 logger.debug("%s.get_handler: connection args - %s", self.__class__.__name__, connection_args)
470 file_storage = FileStorage(
471 resource_group=RESOURCE_GROUP.INTEGRATION,
472 resource_id=integration_record.id,
473 sync=True,
474 )
475 handler_storage = HandlerStorage(integration_record.id)
477 if isinstance(connection_args, (dict, OrderedDict)):
478 files_to_get = {
479 arg_name: arg_value
480 for arg_name, arg_value in connection_data.items()
481 if arg_name in connection_args and connection_args.get(arg_name)["type"] == ARG_TYPE.PATH
482 }
483 if len(files_to_get) > 0: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true
484 for file_name, file_path in files_to_get.items():
485 connection_data[file_name] = file_storage.get_path(file_path)
487 handler_ars = self._make_handler_args(
488 name=name,
489 handler_type=integration_engine,
490 connection_data=connection_data,
491 integration_id=integration_data["id"],
492 file_storage=file_storage,
493 handler_storage=handler_storage,
494 )
496 HandlerClass = self.handler_modules[integration_engine].Handler
497 handler = HandlerClass(**handler_ars)
498 if connect: 498 ↛ 501line 498 didn't jump to line 501 because the condition on line 498 was always true
499 self.handlers_cache.set(handler)
501 ctx.used_handlers.add(getattr(handler.__class__, "name", handler.__class__.__name__))
502 return handler
504 def reload_handler_module(self, handler_name):
505 importlib.reload(self.handler_modules[handler_name])
506 try:
507 handler_meta = self._get_handler_meta(handler_name)
508 except Exception as e:
509 handler_meta = self.handlers_import_status[handler_name]
510 handler_meta["import"]["success"] = False
511 handler_meta["import"]["error_message"] = str(e)
513 self.handlers_import_status[handler_name] = handler_meta
515 def _read_dependencies(self, path):
516 dependencies = []
517 requirements_txt = Path(path).joinpath("requirements.txt")
518 if requirements_txt.is_file():
519 with open(str(requirements_txt), "rt") as f:
520 dependencies = [x.strip(" \t\n") for x in f.readlines()]
521 dependencies = [x for x in dependencies if len(x) > 0]
522 return dependencies
524 def _get_handler_meta(self, handler_name):
525 module = self.handler_modules[handler_name]
527 handler_dir = Path(module.__path__[0])
528 handler_folder_name = handler_dir.name
530 import_error = getattr(module, "import_error", None)
531 handler_meta = self.handlers_import_status[handler_name]
532 handler_meta["import"]["success"] = import_error is None
533 handler_meta["version"] = module.version
534 handler_meta["thread_safe"] = getattr(module, "cache_thread_safe", False)
536 if import_error is not None: 536 ↛ 537line 536 didn't jump to line 537 because the condition on line 536 was never true
537 handler_meta["import"]["error_message"] = str(import_error)
539 handler_type = getattr(module, "type", None)
540 handler_class = None
541 if hasattr(module, "Handler") and inspect.isclass(module.Handler): 541 ↛ 550line 541 didn't jump to line 550 because the condition on line 541 was always true
542 handler_class = module.Handler
543 if issubclass(handler_class, BaseMLEngine):
544 handler_meta["class_type"] = "ml"
545 elif issubclass(handler_class, DatabaseHandler):
546 handler_meta["class_type"] = "sql"
547 if issubclass(handler_class, APIHandler): 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true
548 handler_meta["class_type"] = "api"
550 if handler_type == HANDLER_TYPE.ML:
551 # for ml engines, patch the connection_args from the argument probing
552 if handler_class: 552 ↛ 563line 552 didn't jump to line 563 because the condition on line 552 was always true
553 try:
554 prediction_args = handler_class.prediction_args()
555 creation_args = getattr(module, "creation_args", handler_class.creation_args())
556 connection_args = {"prediction": prediction_args, "creation_args": creation_args}
557 setattr(module, "connection_args", connection_args)
558 logger.debug("Patched connection_args for %s", handler_folder_name)
559 except Exception as e:
560 # do nothing
561 logger.debug("Failed to patch connection_args for %s, reason: %s", handler_folder_name, str(e))
563 module_attrs = [
564 attr
565 for attr in ["connection_args_example", "connection_args", "description", "type", "title"]
566 if hasattr(module, attr)
567 ]
569 for attr in module_attrs:
570 handler_meta[attr] = getattr(module, attr)
572 # endregion
573 if hasattr(module, "permanent"):
574 handler_meta["permanent"] = module.permanent
575 else:
576 if handler_meta.get("name") in ("files", "views", "lightwood"): 576 ↛ 577line 576 didn't jump to line 577 because the condition on line 576 was never true
577 handler_meta["permanent"] = True
578 else:
579 handler_meta["permanent"] = False
581 return handler_meta
583 def _get_handler_icon(self, handler_dir, icon_path):
584 icon = {}
585 try:
586 icon_path = handler_dir.joinpath(icon_path)
587 icon_type = icon_path.name[icon_path.name.rfind(".") + 1 :].lower()
589 if icon_type == "svg":
590 with open(str(icon_path), "rt") as f:
591 icon["data"] = f.read()
592 else:
593 with open(str(icon_path), "rb") as f:
594 icon["data"] = base64.b64encode(f.read()).decode("utf-8")
596 icon["name"] = icon_path.name
597 icon["type"] = icon_type
599 except Exception as e:
600 logger.error(f"Error reading icon for {handler_dir}, {e}!")
601 return icon
603 def _load_handler_modules(self):
604 mindsdb_path = Path(importlib.util.find_spec("mindsdb").origin).parent
605 handlers_path = mindsdb_path.joinpath("integrations/handlers")
607 # edge case: running from tests directory, find_spec finds the base folder instead of actual package
608 if not os.path.isdir(handlers_path): 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true
609 mindsdb_path = Path(importlib.util.find_spec("mindsdb").origin).parent.joinpath("mindsdb")
610 handlers_path = mindsdb_path.joinpath("integrations/handlers")
612 self.handler_modules = {}
613 self.handlers_import_status = {}
614 for handler_dir in handlers_path.iterdir():
615 if handler_dir.is_dir() is False or handler_dir.name.startswith("__"):
616 continue
618 handler_info = self._get_handler_info(handler_dir)
619 if "name" not in handler_info: 619 ↛ 620line 619 didn't jump to line 620 because the condition on line 619 was never true
620 continue
621 handler_name = handler_info["name"]
622 dependencies = self._read_dependencies(handler_dir)
623 handler_meta = {
624 "path": handler_dir,
625 "import": {
626 "success": None,
627 "error_message": None,
628 "folder": handler_dir.name,
629 "dependencies": dependencies,
630 },
631 "name": handler_name,
632 "permanent": handler_info.get("permanent", False),
633 "connection_args": handler_info.get("connection_args", None),
634 "class_type": handler_info.get("class_type", None),
635 "type": handler_info.get("type"),
636 }
637 if "icon_path" in handler_info: 637 ↛ 641line 637 didn't jump to line 641 because the condition on line 637 was always true
638 icon = self._get_handler_icon(handler_dir, handler_info["icon_path"])
639 if icon: 639 ↛ 641line 639 didn't jump to line 641 because the condition on line 639 was always true
640 handler_meta["icon"] = icon
641 self.handlers_import_status[handler_name] = handler_meta
643 def _get_connection_args(self, args_file: Path, param_name: str) -> dict:
644 """
645 Extract connection args dict from connection args file of a handler
647 :param args_file: path to file with connection args
648 :param param_name: the name of variable which contains connection args
649 :return: extracted connection arguments
650 """
652 code = ast.parse(args_file.read_text())
654 args = {}
655 for item in code.body:
656 if not isinstance(item, ast.Assign):
657 continue
658 if not item.targets[0].id == param_name:
659 continue
660 if hasattr(item.value, "keywords"): 660 ↛ 655line 660 didn't jump to line 655 because the condition on line 660 was always true
661 for keyword in item.value.keywords:
662 name = keyword.arg
663 params = keyword.value
664 if isinstance(params, ast.Dict): 664 ↛ 679line 664 didn't jump to line 679 because the condition on line 664 was always true
665 # get dict value
666 info = {}
667 for i, k in enumerate(params.keys):
668 if not isinstance(k, ast.Constant): 668 ↛ 669line 668 didn't jump to line 669 because the condition on line 668 was never true
669 continue
670 v = params.values[i]
671 if isinstance(v, ast.Constant):
672 v = v.value
673 elif isinstance(v, ast.Attribute): 673 ↛ 677line 673 didn't jump to line 677 because the condition on line 673 was always true
674 # assume it is ARG_TYPE
675 v = getattr(ARG_TYPE, v.attr, None)
676 else:
677 v = None
678 info[k.value] = v
679 args[name] = info
680 return args
682 def _get_base_class_type(self, code, handler_dir: Path) -> Optional[str]:
683 """
684 Find base class of data handler: sql or api
685 It tries to find import inside try-except of init file
686 They parsed this import in order to find base class of data handler
688 :param code: parsed code of __init__ file of a handler
689 :param handler_dir: folder of a handler
690 :return: base class type
691 """
693 module_file = None
694 for block in code.body:
695 if not isinstance(block, ast.Try):
696 continue
697 for item in block.body: 697 ↛ 694line 697 didn't jump to line 694 because the loop on line 697 didn't complete
698 if isinstance(item, ast.ImportFrom): 698 ↛ 697line 698 didn't jump to line 697 because the condition on line 698 was always true
699 module_file = item.module
700 break
701 if module_file is None:
702 return
704 path = handler_dir / f"{module_file}.py"
706 if not path.exists(): 706 ↛ 707line 706 didn't jump to line 707 because the condition on line 706 was never true
707 return
708 code = ast.parse(path.read_text())
709 # find base class of handler.
710 # TODO trace inheritance (is used only for sql handler)
711 for item in code.body:
712 if isinstance(item, ast.ClassDef):
713 bases = [base.id for base in item.bases]
714 if "APIHandler" in bases or "MetaAPIHandler" in bases:
715 return "api"
716 return "sql"
718 def _get_handler_info(self, handler_dir: Path) -> dict:
719 """
720 Get handler info without importing it
721 :param handler_dir: folder of handler
722 :return: Extracted params:
723 - defined constants in init file
724 - connection arguments
725 """
727 init_file = handler_dir / "__init__.py"
728 if not init_file.exists(): 728 ↛ 729line 728 didn't jump to line 729 because the condition on line 728 was never true
729 return {}
730 code = ast.parse(init_file.read_text())
732 info = {}
733 for item in code.body:
734 if not isinstance(item, ast.Assign):
735 continue
736 if isinstance(item.targets[0], ast.Name): 736 ↛ 733line 736 didn't jump to line 733 because the condition on line 736 was always true
737 name = item.targets[0].id
738 if isinstance(item.value, ast.Constant):
739 info[name] = item.value.value
740 if isinstance(item.value, ast.Attribute) and name == "type":
741 if item.value.attr == "ML":
742 info[name] = HANDLER_TYPE.ML
743 info["class_type"] = "ml"
744 else:
745 info[name] = HANDLER_TYPE.DATA
746 info["class_type"] = self._get_base_class_type(code, handler_dir) or "sql"
748 # connection args
749 if info["type"] == HANDLER_TYPE.ML:
750 args_file = handler_dir / "creation_args.py"
751 if args_file.exists():
752 info["connection_args"] = {
753 "prediction": {},
754 "creation_args": self._get_connection_args(args_file, "creation_args"),
755 }
756 else:
757 args_file = handler_dir / "connection_args.py"
758 if args_file.exists():
759 info["connection_args"] = self._get_connection_args(args_file, "connection_args")
761 return info
763 def import_handler(self, handler_name: str, base_import: str = None):
764 with self._import_lock:
765 time_before_import = time.perf_counter()
766 logger.debug(f"Importing handler '{handler_name}'")
767 handler_meta = self.handlers_import_status[handler_name]
768 handler_dir = handler_meta["path"]
770 handler_folder_name = str(handler_dir.name)
771 if base_import is None:
772 base_import = "mindsdb.integrations.handlers."
774 try:
775 handler_module = importlib.import_module(f"{base_import}{handler_folder_name}")
776 self.handler_modules[handler_name] = handler_module
777 handler_meta = self._get_handler_meta(handler_name)
778 logger.debug(
779 f"Handler '{handler_name}' imported successfully in {(time.perf_counter() - time_before_import):.3f} seconds"
780 )
781 except Exception as e:
782 handler_meta["import"]["success"] = False
783 handler_meta["import"]["error_message"] = str(e)
784 logger.debug(f"Failed to import handler '{handler_name}': {e}")
786 self.handlers_import_status[handler_meta["name"]] = handler_meta
787 return handler_meta
789 def get_handlers_import_status(self):
790 # tries to import all not imported yet
792 result = {}
793 for handler_name in list(self.handlers_import_status.keys()):
794 handler_meta = self.get_handler_meta(handler_name)
795 result[handler_name] = handler_meta
797 return result
799 def get_handlers_metadata(self):
800 return self.handlers_import_status
802 def get_handler_metadata(self, handler_name):
803 # returns metadata
804 return self.handlers_import_status.get(handler_name)
806 def get_handler_meta(self, handler_name):
807 # returns metadata and tries to import it
808 handler_meta = self.handlers_import_status.get(handler_name)
809 if handler_meta is None: 809 ↛ 810line 809 didn't jump to line 810 because the condition on line 809 was never true
810 return
811 if handler_meta["import"]["success"] is None:
812 handler_meta = self.import_handler(handler_name)
813 return handler_meta
815 def get_handler_module(self, handler_name):
816 handler_meta = self.get_handler_meta(handler_name)
817 if handler_meta is None: 817 ↛ 818line 817 didn't jump to line 818 because the condition on line 817 was never true
818 return
819 if handler_meta["import"]["success"]: 819 ↛ exitline 819 didn't return from function 'get_handler_module' because the condition on line 819 was always true
820 return self.handler_modules[handler_name]
822 def create_permanent_integrations(self):
823 for (
824 integration_name,
825 handler,
826 ) in self.get_handlers_metadata().items():
827 if handler.get("permanent"):
828 integration_meta = integration_controller.get(name=integration_name)
829 if integration_meta is None:
830 integration_record = db.Integration(
831 name=integration_name,
832 data={},
833 engine=integration_name,
834 company_id=None,
835 )
836 db.session.add(integration_record)
837 db.session.commit()
840integration_controller = IntegrationController()