Coverage for mindsdb / interfaces / database / integrations.py: 69%

488 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2import base64 

3import shutil 

4import ast 

5import time 

6import tempfile 

7import importlib 

8import threading 

9from pathlib import Path 

10from copy import deepcopy 

11from typing import Optional 

12from textwrap import dedent 

13from collections import OrderedDict 

14import inspect 

15 

16from sqlalchemy import func 

17 

18from mindsdb.interfaces.storage import db 

19from mindsdb.utilities.config import Config 

20from mindsdb.utilities.exception import EntityNotExistsError 

21from mindsdb.interfaces.storage.fs import FsStore, FileStorage, RESOURCE_GROUP 

22from mindsdb.interfaces.storage.model_fs import HandlerStorage 

23from mindsdb.interfaces.file.file_controller import FileController 

24from mindsdb.integrations.libs.base import DatabaseHandler 

25from mindsdb.integrations.libs.base import BaseMLEngine 

26from mindsdb.integrations.libs.api_handler import APIHandler 

27from mindsdb.integrations.libs.const import HANDLER_CONNECTION_ARG_TYPE as ARG_TYPE, HANDLER_TYPE 

28from mindsdb.interfaces.model.functions import get_model_records 

29from mindsdb.utilities.context import context as ctx 

30from mindsdb.utilities import log 

31from mindsdb.integrations.libs.ml_exec_base import BaseMLEngineExec 

32from mindsdb.integrations.libs.base import BaseHandler 

33import mindsdb.utilities.profiler as profiler 

34from mindsdb.interfaces.database.data_handlers_cache import HandlersCache 

35 

36logger = log.getLogger(__name__) 

37 

38 

39class IntegrationController: 

40 @staticmethod 

41 def _is_not_empty_str(s): 

42 return isinstance(s, str) and len(s) > 0 

43 

44 def __init__(self): 

45 self._import_lock = threading.Lock() 

46 self._load_handler_modules() 

47 self.handlers_cache = HandlersCache() 

48 

49 def _add_integration_record(self, name, engine, connection_args): 

50 integration_record = db.Integration( 

51 name=name, engine=engine, data=connection_args or {}, company_id=ctx.company_id 

52 ) 

53 db.session.add(integration_record) 

54 db.session.commit() 

55 return integration_record.id 

56 

57 def add(self, name: str, engine, connection_args): 

58 logger.debug( 

59 "%s: add method calling name=%s, engine=%s, connection_args=%s, company_id=%s", 

60 self.__class__.__name__, 

61 name, 

62 engine, 

63 connection_args, 

64 ctx.company_id, 

65 ) 

66 handler_meta = self.get_handler_meta(engine) 

67 

68 accept_connection_args = handler_meta.get("connection_args") 

69 logger.debug("%s: accept_connection_args - %s", self.__class__.__name__, accept_connection_args) 

70 

71 files_dir = None 

72 if accept_connection_args is not None and connection_args is not None: 

73 for arg_name, arg_value in connection_args.items(): 

74 if arg_name in accept_connection_args and accept_connection_args[arg_name]["type"] == ARG_TYPE.PATH: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 if files_dir is None: 

76 files_dir = tempfile.mkdtemp(prefix="mindsdb_files_") 

77 shutil.copy(arg_value, files_dir) 

78 connection_args[arg_name] = Path(arg_value).name 

79 

80 integration_id = self._add_integration_record(name, engine, connection_args) 

81 

82 if files_dir is not None: 82 ↛ 83line 82 didn't jump to line 83 because the condition on line 82 was never true

83 store = FileStorage(resource_group=RESOURCE_GROUP.INTEGRATION, resource_id=integration_id, sync=False) 

84 store.add(files_dir, "") 

85 store.push() 

86 

87 if handler_meta.get("type") == HANDLER_TYPE.ML: 

88 ml_handler = self.get_ml_handler(name) 

89 ml_handler.create_engine(connection_args, integration_id) 

90 

91 return integration_id 

92 

93 def modify(self, name, data, check_connection=False): 

94 self.handlers_cache.delete(name) 

95 integration_record = self._get_integration_record(name) 

96 if isinstance(integration_record.data, dict) and integration_record.data.get("is_demo") is True: 96 ↛ 97line 96 didn't jump to line 97 because the condition on line 96 was never true

97 raise ValueError("It is forbidden to change properties of the demo object") 

98 old_data = deepcopy(integration_record.data) 

99 for k in old_data: 99 ↛ 100line 99 didn't jump to line 100 because the loop on line 99 never started

100 if k not in data: 

101 data[k] = old_data[k] 

102 

103 # Test the new connection data before applying 

104 if check_connection: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true

105 try: 

106 temp_name = f"{integration_record.name}_update_{time.time()}".replace(".", "") 

107 handler = self.create_tmp_handler(temp_name, integration_record.engine, data) 

108 status = handler.check_connection() 

109 except ImportError: 

110 raise 

111 

112 if status.success is not True: 

113 raise Exception(f"Connection test failed: {status.error_message}") 

114 

115 integration_record.data = data 

116 db.session.commit() 

117 

118 def delete(self, name: str, strict_case: bool = False) -> None: 

119 """Delete an integration by name. 

120 

121 Args: 

122 name (str): The name of the integration to delete. 

123 strict_case (bool, optional): If True, the integration name is case-sensitive. Defaults to False. 

124 

125 Raises: 

126 Exception: If the integration cannot be deleted (system, permanent, demo, in use, or has active models). 

127 

128 Returns: 

129 None 

130 """ 

131 if name == "files": 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true

132 raise Exception("Unable to drop: is system database") 

133 

134 self.handlers_cache.delete(name) 

135 

136 # check permanent integration 

137 if name.lower() in self.handler_modules: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true

138 handler = self.handler_modules[name] 

139 

140 if getattr(handler, "permanent", False) is True: 

141 raise Exception("Unable to drop permanent integration") 

142 

143 integration_record = self._get_integration_record(name, case_sensitive=strict_case) 

144 if isinstance(integration_record.data, dict) and integration_record.data.get("is_demo") is True: 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true

145 raise Exception("Unable to drop demo object") 

146 

147 # if this is ml engine 

148 engine_models = get_model_records(ml_handler_name=name, deleted_at=None) 

149 active_models = [m.name for m in engine_models if m.deleted_at is None] 

150 if len(active_models) > 0: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true

151 raise Exception(f"Unable to drop ml engine with active models: {active_models}") 

152 

153 # check linked KBs 

154 kb = db.KnowledgeBase.query.filter_by(vector_database_id=integration_record.id).first() 

155 if kb is not None: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 raise Exception(f"Unable to drop, integration is used by knowledge base: {kb.name}") 

157 

158 # check linked predictors 

159 models = get_model_records() 

160 for model in models: 160 ↛ 161line 160 didn't jump to line 161 because the loop on line 160 never started

161 if ( 

162 model.data_integration_ref is not None 

163 and model.data_integration_ref.get("type") == "integration" 

164 and isinstance(model.data_integration_ref.get("id"), int) 

165 and model.data_integration_ref["id"] == integration_record.id 

166 ): 

167 model.data_integration_ref = None 

168 

169 # unlink deleted models 

170 for model in engine_models: 170 ↛ 171line 170 didn't jump to line 171 because the loop on line 170 never started

171 if model.deleted_at is not None: 

172 model.integration_id = None 

173 

174 db.session.delete(integration_record) 

175 db.session.commit() 

176 

177 def _get_integration_record_data(self, integration_record, show_secrets=True): 

178 if ( 178 ↛ 183line 178 didn't jump to line 183 because the condition on line 178 was never true

179 integration_record is None 

180 or integration_record.data is None 

181 or isinstance(integration_record.data, dict) is False 

182 ): 

183 return None 

184 data = deepcopy(integration_record.data) 

185 

186 bundle_path = data.get("secure_connect_bundle") 

187 mysql_ssl_ca = data.get("ssl_ca") 

188 mysql_ssl_cert = data.get("ssl_cert") 

189 mysql_ssl_key = data.get("ssl_key") 

190 if ( 190 ↛ 200line 190 didn't jump to line 200 because the condition on line 190 was never true

191 data.get("type") in ("mysql", "mariadb") 

192 and ( 

193 self._is_not_empty_str(mysql_ssl_ca) 

194 or self._is_not_empty_str(mysql_ssl_cert) 

195 or self._is_not_empty_str(mysql_ssl_key) 

196 ) 

197 or data.get("type") in ("cassandra", "scylla") 

198 and bundle_path is not None 

199 ): 

200 fs_store = FsStore() 

201 integrations_dir = Config()["paths"]["integrations"] 

202 folder_name = f"integration_files_{integration_record.company_id}_{integration_record.id}" 

203 fs_store.get(folder_name, base_dir=integrations_dir) 

204 

205 handler_meta = self.get_handler_metadata(integration_record.engine) 

206 integration_type = None 

207 if isinstance(handler_meta, dict): 207 ↛ 211line 207 didn't jump to line 211 because the condition on line 207 was always true

208 # in other cases, the handler directory is likely not exist. 

209 integration_type = handler_meta.get("type") 

210 

211 if show_secrets is False and handler_meta is not None: 

212 connection_args = handler_meta.get("connection_args", None) 

213 if isinstance(connection_args, dict): 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true

214 if integration_type == HANDLER_TYPE.DATA: 

215 for key, value in connection_args.items(): 

216 if key in data and value.get("secret", False) is True: 

217 data[key] = "******" 

218 elif integration_type == HANDLER_TYPE.ML: 

219 creation_args = connection_args.get("creation_args") 

220 if isinstance(creation_args, dict): 

221 for key, value in creation_args.items(): 

222 if key in data and value.get("secret", False) is True: 

223 data[key] = "******" 

224 else: 

225 raise ValueError(f"Unexpected handler type: {integration_type}") 

226 else: 

227 # region obsolete, del in future 

228 if "password" in data: 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true

229 data["password"] = None 

230 if ( 230 ↛ 235line 230 didn't jump to line 235 because the condition on line 230 was never true

231 data.get("type") == "redis" 

232 and isinstance(data.get("connection"), dict) 

233 and "password" in data["connection"] 

234 ): 

235 data["connection"] = None 

236 # endregion 

237 

238 class_type, permanent = None, False 

239 if handler_meta is not None: 239 ↛ 243line 239 didn't jump to line 243 because the condition on line 239 was always true

240 class_type = handler_meta.get("class_type") 

241 permanent = handler_meta.get("permanent", False) 

242 

243 return { 

244 "id": integration_record.id, 

245 "name": integration_record.name, 

246 "type": integration_type, 

247 "class_type": class_type, 

248 "engine": integration_record.engine, 

249 "permanent": permanent, 

250 "date_last_update": deepcopy(integration_record.updated_at), # to del ? 

251 "connection_data": data, 

252 } 

253 

254 def get_by_id(self, integration_id, show_secrets=True): 

255 integration_record = ( 

256 db.session.query(db.Integration).filter_by(company_id=ctx.company_id, id=integration_id).first() 

257 ) 

258 return self._get_integration_record_data(integration_record, show_secrets) 

259 

260 def get(self, name, show_secrets=True, case_sensitive=False): 

261 try: 

262 integration_record = self._get_integration_record(name, case_sensitive) 

263 except EntityNotExistsError: 

264 return None 

265 return self._get_integration_record_data(integration_record, show_secrets) 

266 

267 @staticmethod 

268 def _get_integration_record(name: str, case_sensitive: bool = False) -> db.Integration: 

269 """Get integration record by name 

270 

271 Args: 

272 name (str): name of the integration 

273 case_sensitive (bool): should search be case sensitive or not 

274 

275 Retruns: 

276 db.Integration 

277 """ 

278 if case_sensitive: 278 ↛ 279line 278 didn't jump to line 279 because the condition on line 278 was never true

279 integration_records = db.session.query(db.Integration).filter_by(company_id=ctx.company_id, name=name).all() 

280 if len(integration_records) > 1: 

281 raise Exception(f"There is {len(integration_records)} integrations with name '{name}'") 

282 if len(integration_records) == 0: 

283 raise EntityNotExistsError(f"There is no integration with name '{name}'") 

284 integration_record = integration_records[0] 

285 else: 

286 integration_record = ( 

287 db.session.query(db.Integration) 

288 .filter( 

289 (db.Integration.company_id == ctx.company_id) 

290 & (func.lower(db.Integration.name) == func.lower(name)) 

291 ) 

292 .first() 

293 ) 

294 if integration_record is None: 

295 raise EntityNotExistsError(f"There is no integration with name '{name}'") 

296 

297 return integration_record 

298 

299 def get_all(self, show_secrets=True): 

300 integration_records = db.session.query(db.Integration).filter_by(company_id=ctx.company_id).all() 

301 integration_dict = {} 

302 for record in integration_records: 

303 if record is None or record.data is None: 303 ↛ 304line 303 didn't jump to line 304 because the condition on line 303 was never true

304 continue 

305 integration_dict[record.name] = self._get_integration_record_data(record, show_secrets) 

306 return integration_dict 

307 

308 def _make_handler_args( 

309 self, 

310 name: str, 

311 handler_type: str, 

312 connection_data: dict, 

313 integration_id: int = None, 

314 file_storage: FileStorage = None, 

315 handler_storage: HandlerStorage = None, 

316 ): 

317 handler_args = dict( 

318 name=name, 

319 integration_id=integration_id, 

320 connection_data=connection_data, 

321 file_storage=file_storage, 

322 handler_storage=handler_storage, 

323 ) 

324 

325 if handler_type == "files": 325 ↛ 326line 325 didn't jump to line 326 because the condition on line 325 was never true

326 handler_args["file_controller"] = FileController() 

327 elif self.handler_modules.get(handler_type, False).type == HANDLER_TYPE.ML: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true

328 handler_args["handler_controller"] = self 

329 handler_args["company_id"] = ctx.company_id 

330 

331 return handler_args 

332 

333 def create_tmp_handler(self, name: str, engine: str, connection_args: dict) -> dict: 

334 """Create temporary handler, mostly for testing connections 

335 

336 Args: 

337 name (str): Integration name 

338 engine (str): Integration engine name 

339 connection_args (dict): Connection arguments 

340 

341 Returns: 

342 HandlerClass: Handler class instance 

343 """ 

344 integration_id = int(time.time() * 10000) 

345 

346 file_storage = FileStorage( 

347 resource_group=RESOURCE_GROUP.INTEGRATION, resource_id=integration_id, root_dir="tmp", sync=False 

348 ) 

349 handler_storage = HandlerStorage(integration_id, root_dir="tmp", is_temporal=True) 

350 

351 handler_meta = self.get_handler_meta(engine) 

352 if handler_meta is None: 352 ↛ 353line 352 didn't jump to line 353 because the condition on line 352 was never true

353 raise ImportError(f"Handler '{engine}' does not exist") 

354 if handler_meta["import"]["success"] is False: 354 ↛ 355line 354 didn't jump to line 355 because the condition on line 354 was never true

355 raise ImportError(f"Handler '{engine}' cannot be imported: {handler_meta['import']['error_message']}") 

356 HandlerClass = self.handler_modules[engine].Handler 

357 

358 handler_args = self._make_handler_args( 

359 name=name, 

360 handler_type=engine, 

361 connection_data=connection_args, 

362 integration_id=integration_id, 

363 file_storage=file_storage, 

364 handler_storage=handler_storage, 

365 ) 

366 handler = HandlerClass(**handler_args) 

367 return handler 

368 

369 def copy_integration_storage(self, integration_id_from, integration_id_to): 

370 storage_from = HandlerStorage(integration_id_from) 

371 root_path = "" 

372 

373 if storage_from.is_empty(): 

374 return None 

375 folder_from = storage_from.folder_get(root_path) 

376 

377 storage_to = HandlerStorage(integration_id_to) 

378 folder_to = storage_to.folder_get(root_path) 

379 

380 shutil.copytree(folder_from, folder_to, dirs_exist_ok=True) 

381 storage_to.folder_sync(root_path) 

382 

383 def get_ml_handler(self, name: str, case_sensitive: bool = False) -> BaseMLEngine: 

384 """Get ML handler by name 

385 Args: 

386 name (str): name of the handler 

387 case_sensitive (bool): should case be taken into account when searching by name 

388 

389 Returns: 

390 BaseMLEngine 

391 """ 

392 integration_record = self._get_integration_record(name, case_sensitive) 

393 integration_engine = integration_record.engine 

394 

395 integration_meta = self.get_handler_meta(integration_engine) 

396 if integration_meta is None: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true

397 raise Exception(f"Handler '{name}' does not exists") 

398 

399 if integration_meta.get("type") != HANDLER_TYPE.ML: 399 ↛ 400line 399 didn't jump to line 400 because the condition on line 399 was never true

400 raise Exception(f"Handler '{name}' must be ML type") 

401 

402 logger.info( 

403 f"{self.__class__.__name__}.get_handler: create a ML client " 

404 + f"{integration_record.name}/{integration_record.id}" 

405 ) 

406 handler = BaseMLEngineExec( 

407 name=integration_record.name, 

408 integration_id=integration_record.id, 

409 handler_module=self.handler_modules[integration_engine], 

410 ) 

411 

412 return handler 

413 

414 @profiler.profile() 

415 def get_data_handler(self, name: str, case_sensitive: bool = False, connect=True) -> BaseHandler: 

416 """Get DATA handler (DB or API) by name 

417 

418 Args: 

419 name (str): name of the handler 

420 case_sensitive (bool): should case be taken into account when searching by name 

421 

422 Returns: 

423 BaseHandler: data handler 

424 """ 

425 handler = self.handlers_cache.get(name) 

426 if handler is not None: 

427 ctx.used_handlers.add(getattr(handler.__class__, "name", handler.__class__.__name__)) 

428 return handler 

429 

430 integration_record = self._get_integration_record(name, case_sensitive) 

431 integration_engine = integration_record.engine 

432 

433 integration_meta = self.get_handler_meta(integration_engine) 

434 

435 if integration_meta is None: 435 ↛ 436line 435 didn't jump to line 436 because the condition on line 435 was never true

436 raise Exception(f"Handler '{name}' does not exist") 

437 

438 if integration_meta.get("type") != HANDLER_TYPE.DATA: 438 ↛ 439line 438 didn't jump to line 439 because the condition on line 438 was never true

439 raise Exception(f"Handler '{name}' must be DATA type") 

440 

441 integration_data = self._get_integration_record_data(integration_record, True) 

442 if integration_data is None: 442 ↛ 443line 442 didn't jump to line 443 because the condition on line 442 was never true

443 raise Exception(f"Can't find integration_record for handler '{name}'") 

444 connection_data = integration_data.get("connection_data", {}) 

445 logger.debug( 

446 "%s.get_handler: connection_data=%s, engine=%s", 

447 self.__class__.__name__, 

448 connection_data, 

449 integration_engine, 

450 ) 

451 

452 if integration_meta["import"]["success"] is False: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true

453 msg = dedent(f"""\ 

454 Handler '{integration_engine}' cannot be used. Reason is: 

455 {integration_meta["import"]["error_message"]} 

456 """) 

457 is_cloud = Config().get("cloud", False) 

458 if is_cloud is False: 

459 msg += dedent(f""" 

460 

461 If error is related to missing dependencies, then try to run command in shell and restart mindsdb: 

462 pip install mindsdb[{integration_engine}] 

463 """) 

464 logger.debug(msg) 

465 raise Exception(msg) 

466 

467 connection_args = integration_meta.get("connection_args") 

468 logger.debug("%s.get_handler: connection args - %s", self.__class__.__name__, connection_args) 

469 

470 file_storage = FileStorage( 

471 resource_group=RESOURCE_GROUP.INTEGRATION, 

472 resource_id=integration_record.id, 

473 sync=True, 

474 ) 

475 handler_storage = HandlerStorage(integration_record.id) 

476 

477 if isinstance(connection_args, (dict, OrderedDict)): 

478 files_to_get = { 

479 arg_name: arg_value 

480 for arg_name, arg_value in connection_data.items() 

481 if arg_name in connection_args and connection_args.get(arg_name)["type"] == ARG_TYPE.PATH 

482 } 

483 if len(files_to_get) > 0: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true

484 for file_name, file_path in files_to_get.items(): 

485 connection_data[file_name] = file_storage.get_path(file_path) 

486 

487 handler_ars = self._make_handler_args( 

488 name=name, 

489 handler_type=integration_engine, 

490 connection_data=connection_data, 

491 integration_id=integration_data["id"], 

492 file_storage=file_storage, 

493 handler_storage=handler_storage, 

494 ) 

495 

496 HandlerClass = self.handler_modules[integration_engine].Handler 

497 handler = HandlerClass(**handler_ars) 

498 if connect: 498 ↛ 501line 498 didn't jump to line 501 because the condition on line 498 was always true

499 self.handlers_cache.set(handler) 

500 

501 ctx.used_handlers.add(getattr(handler.__class__, "name", handler.__class__.__name__)) 

502 return handler 

503 

504 def reload_handler_module(self, handler_name): 

505 importlib.reload(self.handler_modules[handler_name]) 

506 try: 

507 handler_meta = self._get_handler_meta(handler_name) 

508 except Exception as e: 

509 handler_meta = self.handlers_import_status[handler_name] 

510 handler_meta["import"]["success"] = False 

511 handler_meta["import"]["error_message"] = str(e) 

512 

513 self.handlers_import_status[handler_name] = handler_meta 

514 

515 def _read_dependencies(self, path): 

516 dependencies = [] 

517 requirements_txt = Path(path).joinpath("requirements.txt") 

518 if requirements_txt.is_file(): 

519 with open(str(requirements_txt), "rt") as f: 

520 dependencies = [x.strip(" \t\n") for x in f.readlines()] 

521 dependencies = [x for x in dependencies if len(x) > 0] 

522 return dependencies 

523 

524 def _get_handler_meta(self, handler_name): 

525 module = self.handler_modules[handler_name] 

526 

527 handler_dir = Path(module.__path__[0]) 

528 handler_folder_name = handler_dir.name 

529 

530 import_error = getattr(module, "import_error", None) 

531 handler_meta = self.handlers_import_status[handler_name] 

532 handler_meta["import"]["success"] = import_error is None 

533 handler_meta["version"] = module.version 

534 handler_meta["thread_safe"] = getattr(module, "cache_thread_safe", False) 

535 

536 if import_error is not None: 536 ↛ 537line 536 didn't jump to line 537 because the condition on line 536 was never true

537 handler_meta["import"]["error_message"] = str(import_error) 

538 

539 handler_type = getattr(module, "type", None) 

540 handler_class = None 

541 if hasattr(module, "Handler") and inspect.isclass(module.Handler): 541 ↛ 550line 541 didn't jump to line 550 because the condition on line 541 was always true

542 handler_class = module.Handler 

543 if issubclass(handler_class, BaseMLEngine): 

544 handler_meta["class_type"] = "ml" 

545 elif issubclass(handler_class, DatabaseHandler): 

546 handler_meta["class_type"] = "sql" 

547 if issubclass(handler_class, APIHandler): 547 ↛ 548line 547 didn't jump to line 548 because the condition on line 547 was never true

548 handler_meta["class_type"] = "api" 

549 

550 if handler_type == HANDLER_TYPE.ML: 

551 # for ml engines, patch the connection_args from the argument probing 

552 if handler_class: 552 ↛ 563line 552 didn't jump to line 563 because the condition on line 552 was always true

553 try: 

554 prediction_args = handler_class.prediction_args() 

555 creation_args = getattr(module, "creation_args", handler_class.creation_args()) 

556 connection_args = {"prediction": prediction_args, "creation_args": creation_args} 

557 setattr(module, "connection_args", connection_args) 

558 logger.debug("Patched connection_args for %s", handler_folder_name) 

559 except Exception as e: 

560 # do nothing 

561 logger.debug("Failed to patch connection_args for %s, reason: %s", handler_folder_name, str(e)) 

562 

563 module_attrs = [ 

564 attr 

565 for attr in ["connection_args_example", "connection_args", "description", "type", "title"] 

566 if hasattr(module, attr) 

567 ] 

568 

569 for attr in module_attrs: 

570 handler_meta[attr] = getattr(module, attr) 

571 

572 # endregion 

573 if hasattr(module, "permanent"): 

574 handler_meta["permanent"] = module.permanent 

575 else: 

576 if handler_meta.get("name") in ("files", "views", "lightwood"): 576 ↛ 577line 576 didn't jump to line 577 because the condition on line 576 was never true

577 handler_meta["permanent"] = True 

578 else: 

579 handler_meta["permanent"] = False 

580 

581 return handler_meta 

582 

583 def _get_handler_icon(self, handler_dir, icon_path): 

584 icon = {} 

585 try: 

586 icon_path = handler_dir.joinpath(icon_path) 

587 icon_type = icon_path.name[icon_path.name.rfind(".") + 1 :].lower() 

588 

589 if icon_type == "svg": 

590 with open(str(icon_path), "rt") as f: 

591 icon["data"] = f.read() 

592 else: 

593 with open(str(icon_path), "rb") as f: 

594 icon["data"] = base64.b64encode(f.read()).decode("utf-8") 

595 

596 icon["name"] = icon_path.name 

597 icon["type"] = icon_type 

598 

599 except Exception as e: 

600 logger.error(f"Error reading icon for {handler_dir}, {e}!") 

601 return icon 

602 

603 def _load_handler_modules(self): 

604 mindsdb_path = Path(importlib.util.find_spec("mindsdb").origin).parent 

605 handlers_path = mindsdb_path.joinpath("integrations/handlers") 

606 

607 # edge case: running from tests directory, find_spec finds the base folder instead of actual package 

608 if not os.path.isdir(handlers_path): 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true

609 mindsdb_path = Path(importlib.util.find_spec("mindsdb").origin).parent.joinpath("mindsdb") 

610 handlers_path = mindsdb_path.joinpath("integrations/handlers") 

611 

612 self.handler_modules = {} 

613 self.handlers_import_status = {} 

614 for handler_dir in handlers_path.iterdir(): 

615 if handler_dir.is_dir() is False or handler_dir.name.startswith("__"): 

616 continue 

617 

618 handler_info = self._get_handler_info(handler_dir) 

619 if "name" not in handler_info: 619 ↛ 620line 619 didn't jump to line 620 because the condition on line 619 was never true

620 continue 

621 handler_name = handler_info["name"] 

622 dependencies = self._read_dependencies(handler_dir) 

623 handler_meta = { 

624 "path": handler_dir, 

625 "import": { 

626 "success": None, 

627 "error_message": None, 

628 "folder": handler_dir.name, 

629 "dependencies": dependencies, 

630 }, 

631 "name": handler_name, 

632 "permanent": handler_info.get("permanent", False), 

633 "connection_args": handler_info.get("connection_args", None), 

634 "class_type": handler_info.get("class_type", None), 

635 "type": handler_info.get("type"), 

636 } 

637 if "icon_path" in handler_info: 637 ↛ 641line 637 didn't jump to line 641 because the condition on line 637 was always true

638 icon = self._get_handler_icon(handler_dir, handler_info["icon_path"]) 

639 if icon: 639 ↛ 641line 639 didn't jump to line 641 because the condition on line 639 was always true

640 handler_meta["icon"] = icon 

641 self.handlers_import_status[handler_name] = handler_meta 

642 

643 def _get_connection_args(self, args_file: Path, param_name: str) -> dict: 

644 """ 

645 Extract connection args dict from connection args file of a handler 

646 

647 :param args_file: path to file with connection args 

648 :param param_name: the name of variable which contains connection args 

649 :return: extracted connection arguments 

650 """ 

651 

652 code = ast.parse(args_file.read_text()) 

653 

654 args = {} 

655 for item in code.body: 

656 if not isinstance(item, ast.Assign): 

657 continue 

658 if not item.targets[0].id == param_name: 

659 continue 

660 if hasattr(item.value, "keywords"): 660 ↛ 655line 660 didn't jump to line 655 because the condition on line 660 was always true

661 for keyword in item.value.keywords: 

662 name = keyword.arg 

663 params = keyword.value 

664 if isinstance(params, ast.Dict): 664 ↛ 679line 664 didn't jump to line 679 because the condition on line 664 was always true

665 # get dict value 

666 info = {} 

667 for i, k in enumerate(params.keys): 

668 if not isinstance(k, ast.Constant): 668 ↛ 669line 668 didn't jump to line 669 because the condition on line 668 was never true

669 continue 

670 v = params.values[i] 

671 if isinstance(v, ast.Constant): 

672 v = v.value 

673 elif isinstance(v, ast.Attribute): 673 ↛ 677line 673 didn't jump to line 677 because the condition on line 673 was always true

674 # assume it is ARG_TYPE 

675 v = getattr(ARG_TYPE, v.attr, None) 

676 else: 

677 v = None 

678 info[k.value] = v 

679 args[name] = info 

680 return args 

681 

682 def _get_base_class_type(self, code, handler_dir: Path) -> Optional[str]: 

683 """ 

684 Find base class of data handler: sql or api 

685 It tries to find import inside try-except of init file 

686 They parsed this import in order to find base class of data handler 

687 

688 :param code: parsed code of __init__ file of a handler 

689 :param handler_dir: folder of a handler 

690 :return: base class type 

691 """ 

692 

693 module_file = None 

694 for block in code.body: 

695 if not isinstance(block, ast.Try): 

696 continue 

697 for item in block.body: 697 ↛ 694line 697 didn't jump to line 694 because the loop on line 697 didn't complete

698 if isinstance(item, ast.ImportFrom): 698 ↛ 697line 698 didn't jump to line 697 because the condition on line 698 was always true

699 module_file = item.module 

700 break 

701 if module_file is None: 

702 return 

703 

704 path = handler_dir / f"{module_file}.py" 

705 

706 if not path.exists(): 706 ↛ 707line 706 didn't jump to line 707 because the condition on line 706 was never true

707 return 

708 code = ast.parse(path.read_text()) 

709 # find base class of handler. 

710 # TODO trace inheritance (is used only for sql handler) 

711 for item in code.body: 

712 if isinstance(item, ast.ClassDef): 

713 bases = [base.id for base in item.bases] 

714 if "APIHandler" in bases or "MetaAPIHandler" in bases: 

715 return "api" 

716 return "sql" 

717 

718 def _get_handler_info(self, handler_dir: Path) -> dict: 

719 """ 

720 Get handler info without importing it 

721 :param handler_dir: folder of handler 

722 :return: Extracted params: 

723 - defined constants in init file 

724 - connection arguments 

725 """ 

726 

727 init_file = handler_dir / "__init__.py" 

728 if not init_file.exists(): 728 ↛ 729line 728 didn't jump to line 729 because the condition on line 728 was never true

729 return {} 

730 code = ast.parse(init_file.read_text()) 

731 

732 info = {} 

733 for item in code.body: 

734 if not isinstance(item, ast.Assign): 

735 continue 

736 if isinstance(item.targets[0], ast.Name): 736 ↛ 733line 736 didn't jump to line 733 because the condition on line 736 was always true

737 name = item.targets[0].id 

738 if isinstance(item.value, ast.Constant): 

739 info[name] = item.value.value 

740 if isinstance(item.value, ast.Attribute) and name == "type": 

741 if item.value.attr == "ML": 

742 info[name] = HANDLER_TYPE.ML 

743 info["class_type"] = "ml" 

744 else: 

745 info[name] = HANDLER_TYPE.DATA 

746 info["class_type"] = self._get_base_class_type(code, handler_dir) or "sql" 

747 

748 # connection args 

749 if info["type"] == HANDLER_TYPE.ML: 

750 args_file = handler_dir / "creation_args.py" 

751 if args_file.exists(): 

752 info["connection_args"] = { 

753 "prediction": {}, 

754 "creation_args": self._get_connection_args(args_file, "creation_args"), 

755 } 

756 else: 

757 args_file = handler_dir / "connection_args.py" 

758 if args_file.exists(): 

759 info["connection_args"] = self._get_connection_args(args_file, "connection_args") 

760 

761 return info 

762 

763 def import_handler(self, handler_name: str, base_import: str = None): 

764 with self._import_lock: 

765 time_before_import = time.perf_counter() 

766 logger.debug(f"Importing handler '{handler_name}'") 

767 handler_meta = self.handlers_import_status[handler_name] 

768 handler_dir = handler_meta["path"] 

769 

770 handler_folder_name = str(handler_dir.name) 

771 if base_import is None: 

772 base_import = "mindsdb.integrations.handlers." 

773 

774 try: 

775 handler_module = importlib.import_module(f"{base_import}{handler_folder_name}") 

776 self.handler_modules[handler_name] = handler_module 

777 handler_meta = self._get_handler_meta(handler_name) 

778 logger.debug( 

779 f"Handler '{handler_name}' imported successfully in {(time.perf_counter() - time_before_import):.3f} seconds" 

780 ) 

781 except Exception as e: 

782 handler_meta["import"]["success"] = False 

783 handler_meta["import"]["error_message"] = str(e) 

784 logger.debug(f"Failed to import handler '{handler_name}': {e}") 

785 

786 self.handlers_import_status[handler_meta["name"]] = handler_meta 

787 return handler_meta 

788 

789 def get_handlers_import_status(self): 

790 # tries to import all not imported yet 

791 

792 result = {} 

793 for handler_name in list(self.handlers_import_status.keys()): 

794 handler_meta = self.get_handler_meta(handler_name) 

795 result[handler_name] = handler_meta 

796 

797 return result 

798 

799 def get_handlers_metadata(self): 

800 return self.handlers_import_status 

801 

802 def get_handler_metadata(self, handler_name): 

803 # returns metadata 

804 return self.handlers_import_status.get(handler_name) 

805 

806 def get_handler_meta(self, handler_name): 

807 # returns metadata and tries to import it 

808 handler_meta = self.handlers_import_status.get(handler_name) 

809 if handler_meta is None: 809 ↛ 810line 809 didn't jump to line 810 because the condition on line 809 was never true

810 return 

811 if handler_meta["import"]["success"] is None: 

812 handler_meta = self.import_handler(handler_name) 

813 return handler_meta 

814 

815 def get_handler_module(self, handler_name): 

816 handler_meta = self.get_handler_meta(handler_name) 

817 if handler_meta is None: 817 ↛ 818line 817 didn't jump to line 818 because the condition on line 817 was never true

818 return 

819 if handler_meta["import"]["success"]: 819 ↛ exitline 819 didn't return from function 'get_handler_module' because the condition on line 819 was always true

820 return self.handler_modules[handler_name] 

821 

822 def create_permanent_integrations(self): 

823 for ( 

824 integration_name, 

825 handler, 

826 ) in self.get_handlers_metadata().items(): 

827 if handler.get("permanent"): 

828 integration_meta = integration_controller.get(name=integration_name) 

829 if integration_meta is None: 

830 integration_record = db.Integration( 

831 name=integration_name, 

832 data={}, 

833 engine=integration_name, 

834 company_id=None, 

835 ) 

836 db.session.add(integration_record) 

837 db.session.commit() 

838 

839 

840integration_controller = IntegrationController()