Coverage for mindsdb / interfaces / database / projects.py: 38%

312 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import datetime 

2from copy import deepcopy 

3from typing import List, Optional 

4from collections import OrderedDict 

5 

6import pandas as pd 

7import sqlalchemy as sa 

8import numpy as np 

9 

10from mindsdb_sql_parser.ast.base import ASTNode 

11from mindsdb_sql_parser.ast import Select, Star, Constant, Identifier, BinaryOperation 

12from mindsdb_sql_parser import parse_sql 

13 

14from mindsdb.interfaces.storage import db 

15from mindsdb.interfaces.model.model_controller import ModelController 

16from mindsdb.interfaces.database.views import ViewController 

17from mindsdb.utilities import log 

18from mindsdb.utilities.config import Config 

19from mindsdb.utilities.context import context as ctx 

20from mindsdb.utilities.exception import EntityExistsError, EntityNotExistsError 

21import mindsdb.utilities.profiler as profiler 

22from mindsdb.api.executor.sql_query import SQLQuery 

23from mindsdb.api.executor.utilities.sql import query_df 

24from mindsdb.interfaces.query_context.context_controller import query_context_controller 

25 

26logger = log.getLogger(__name__) 

27 

28 

29class Project: 

30 @staticmethod 

31 def from_record(db_record: db.Project): 

32 p = Project() 

33 p.record = db_record 

34 p.name = db_record.name 

35 p.company_id = ctx.company_id 

36 p.id = db_record.id 

37 p.metadata = db_record.metadata_ 

38 return p 

39 

40 def create(self, name: str): 

41 company_id = ctx.company_id if ctx.company_id is not None else "0" 

42 

43 existing_record = db.Integration.query.filter( 

44 db.Integration.name == name, db.Integration.company_id == ctx.company_id 

45 ).first() 

46 if existing_record is not None: 

47 raise EntityExistsError("Database exists with this name ", name) 

48 

49 existing_record = db.Project.query.filter( 

50 (db.Project.name == name) & (db.Project.company_id == company_id) & (db.Project.deleted_at == sa.null()) 

51 ).first() 

52 if existing_record is not None: 

53 raise EntityExistsError("Project already exists", name) 

54 

55 record = db.Project(name=name, company_id=company_id) 

56 

57 self.record = record 

58 self.name = name 

59 self.company_id = company_id 

60 

61 db.session.add(record) 

62 db.session.commit() 

63 

64 self.id = record.id 

65 

66 def delete(self): 

67 if self.record.metadata_ and self.record.metadata_.get("is_default", False): 

68 raise Exception( 

69 f"Project '{self.name}' can not be deleted, because it is default project." 

70 "The default project can be changed in the config file or by setting the environment variable MINDSDB_DEFAULT_PROJECT." 

71 ) 

72 

73 tables = self.get_tables() 

74 tables = [key for key, val in tables.items() if val["type"] != "table"] 

75 if len(tables) > 0: 

76 raise Exception( 

77 f"Project '{self.name}' can not be deleted, because it contains tables: {', '.join(tables)}" 

78 ) 

79 

80 is_cloud = Config().get("cloud", False) 

81 if is_cloud is True: 

82 self.record.deleted_at = datetime.datetime.now() 

83 else: 

84 db.session.delete(self.record) 

85 self.record = None 

86 self.name = None 

87 self.company_id = None 

88 self.id = None 

89 db.session.commit() 

90 

91 def drop_model(self, name: str): 

92 ModelController().delete_model(name, project_name=self.name) 

93 

94 def drop_view(self, name: str, strict_case: bool = False) -> None: 

95 """Remove a view with the specified name from the current project. 

96 

97 Args: 

98 name (str): The name of the view to remove. 

99 strict_case (bool, optional): If True, the view name is case-sensitive. Defaults to False. 

100 

101 Raises: 

102 EntityNotExistsError: If the view does not exist. 

103 

104 Returns: 

105 None 

106 """ 

107 ViewController().delete(name, project_name=self.name, strict_case=strict_case) 

108 

109 def create_view(self, name: str, query: str, session): 

110 ast_query = parse_sql(query) 

111 

112 if isinstance(ast_query, Select): 112 ↛ 122line 112 didn't jump to line 122 because the condition on line 112 was always true

113 # check create view sql 

114 ast_query.limit = Constant(1) 

115 

116 query_context_controller.set_context(query_context_controller.IGNORE_CONTEXT) 

117 try: 

118 SQLQuery(ast_query, session=session, database=self.name) 

119 finally: 

120 query_context_controller.release_context(query_context_controller.IGNORE_CONTEXT) 

121 

122 ViewController().add(name, query=query, project_name=self.name) 

123 

124 def update_view(self, name: str, query: str, strict_case: bool = False): 

125 ViewController().update(name, query=query, project_name=self.name, strict_case=strict_case) 

126 

127 def delete_view(self, name: str): 

128 ViewController().delete(name, project_name=self.name) 

129 

130 def get_view_meta(self, query: ASTNode) -> ASTNode: 

131 view_name = query.from_table.parts[-1] 

132 view_meta = ViewController().get(name=view_name, project_name=self.name) 

133 view_meta["query_ast"] = parse_sql(view_meta["query"]) 

134 return view_meta 

135 

136 @staticmethod 

137 def combine_view_select(view_query: Select, query: Select) -> Select: 

138 """ 

139 Create a combined query from view's query and outer query. 

140 """ 

141 

142 # apply optimizations 

143 if query.where is not None: 

144 # Get conditions that can be duplicated into view's query 

145 # It has to be simple condition with identifier and constant 

146 # Also it shouldn't be under the OR condition 

147 

148 def get_conditions_to_move(node): 

149 if not isinstance(node, BinaryOperation): 

150 return [] 

151 op = node.op.upper() 

152 if op == "AND": 

153 conditions = [] 

154 conditions.extend(get_conditions_to_move(node.args[0])) 

155 conditions.extend(get_conditions_to_move(node.args[1])) 

156 return conditions 

157 

158 if op == "OR": 

159 return [] 

160 if isinstance(node.args[0], (Identifier, Constant)) and isinstance( 

161 node.args[1], (Identifier, Constant) 

162 ): 

163 return [node] 

164 

165 conditions = get_conditions_to_move(query.where) 

166 

167 if conditions: 

168 # analyse targets 

169 # if target element has alias 

170 # if element is not identifier or the name is not equal to alias: 

171 # add alias to black list 

172 # white list: 

173 # all targets that are identifiers with no alias or equal to its alias 

174 # condition can be moved if 

175 # column is not in black list AND (query has star(*) OR column in white list) 

176 

177 has_star = False 

178 white_list, black_list = [], [] 

179 for target in view_query.targets: 

180 if isinstance(target, Star): 

181 has_star = True 

182 if isinstance(target, Identifier): 

183 name = target.parts[-1].lower() 

184 if target.alias is None or target.alias.parts[-1].lower() == name: 

185 white_list.append(name) 

186 elif target.alias is not None: 

187 black_list.append(target.alias.parts[-1].lower()) 

188 

189 view_where = view_query.where 

190 for condition in conditions: 

191 arg1, arg2 = condition.args 

192 

193 if isinstance(arg1, Identifier): 

194 name = arg1.parts[-1].lower() 

195 if name in black_list or not (has_star or name in white_list): 

196 continue 

197 if isinstance(arg2, Identifier): 

198 name = arg2.parts[-1].lower() 

199 if name in black_list or not (has_star or name in white_list): 

200 continue 

201 

202 # condition can be moved into view 

203 condition2 = BinaryOperation(condition.op, [arg1, arg2]) 

204 if view_where is None: 

205 view_where = condition2 

206 else: 

207 view_where = BinaryOperation("AND", args=[view_where, condition2]) 

208 

209 # disable outer condition 

210 condition.op = "=" 

211 condition.args = [Constant(0), Constant(0)] 

212 

213 view_query.where = view_where 

214 

215 # combine outer query with view's query 

216 view_query.parentheses = True 

217 query.from_table = view_query 

218 return query 

219 

220 def query_view(self, query: Select, session) -> pd.DataFrame: 

221 view_meta = self.get_view_meta(query) 

222 

223 query_context_controller.set_context("view", view_meta["id"]) 

224 query_applied = False 

225 try: 

226 view_query = view_meta["query_ast"] 

227 if isinstance(view_query, Select): 

228 view_query = self.combine_view_select(view_query, query) 

229 query_applied = True 

230 

231 sqlquery = SQLQuery(view_query, session=session) 

232 df = sqlquery.fetched_data.to_df() 

233 finally: 

234 query_context_controller.release_context("view", view_meta["id"]) 

235 

236 # remove duplicated columns 

237 df = df.loc[:, ~df.columns.duplicated()] 

238 if query_applied: 

239 return df 

240 else: 

241 return query_df(df, query, session=session) 

242 

243 @staticmethod 

244 def _get_model_data(predictor_record, integraion_record, with_secrets: bool = True): 

245 from mindsdb.interfaces.database.integrations import integration_controller 

246 

247 predictor_data = predictor_record.data or {} 

248 training_time = None 

249 if ( 249 ↛ 254line 249 didn't jump to line 254 because the condition on line 249 was never true

250 predictor_record.training_start_at is not None 

251 and predictor_record.training_stop_at is None 

252 and predictor_record.status != "error" 

253 ): 

254 training_time = round((datetime.datetime.now() - predictor_record.training_start_at).total_seconds(), 3) 

255 elif predictor_record.training_start_at is not None and predictor_record.training_stop_at is not None: 255 ↛ 261line 255 didn't jump to line 261 because the condition on line 255 was always true

256 training_time = round( 

257 (predictor_record.training_stop_at - predictor_record.training_start_at).total_seconds(), 3 

258 ) 

259 

260 # regon Hide sensitive info 

261 training_options = predictor_record.learn_args 

262 handler_module = integration_controller.get_handler_module(integraion_record.engine) 

263 

264 if with_secrets is False and handler_module: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true

265 model_using_args = getattr(handler_module, "model_using_args", None) 

266 if ( 

267 isinstance(model_using_args, dict) 

268 and isinstance(training_options, dict) 

269 and isinstance(training_options.get("using"), dict) 

270 ): 

271 training_options["using"] = deepcopy(training_options["using"]) 

272 for key, value in model_using_args.items(): 

273 if key in training_options["using"] and value.get("secret", False): 

274 training_options["using"][key] = "******" 

275 # endregion 

276 

277 predictor_meta = { 

278 "type": "model", 

279 "id": predictor_record.id, 

280 "engine": integraion_record.engine, 

281 "engine_name": integraion_record.name, 

282 "active": predictor_record.active, 

283 "version": predictor_record.version, 

284 "status": predictor_record.status, 

285 "accuracy": None, 

286 "predict": predictor_record.to_predict[0], 

287 "update_status": predictor_record.update_status, 

288 "mindsdb_version": predictor_record.mindsdb_version, 

289 "error": predictor_data.get("error"), 

290 "select_data_query": predictor_record.fetch_data_query, 

291 "training_options": training_options, 

292 "deletable": True, 

293 "label": predictor_record.label, 

294 "current_training_phase": predictor_record.training_phase_current, 

295 "total_training_phases": predictor_record.training_phase_total, 

296 "training_phase_name": predictor_record.training_phase_name, 

297 "training_time": training_time, 

298 } 

299 if predictor_data.get("accuracies", None) is not None: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true

300 if len(predictor_data["accuracies"]) > 0: 

301 predictor_meta["accuracy"] = float(np.mean(list(predictor_data["accuracies"].values()))) 

302 return {"name": predictor_record.name, "metadata": predictor_meta, "created_at": predictor_record.created_at} 

303 

304 def get_model(self, name: str): 

305 record = ( 

306 db.session.query(db.Predictor, db.Integration) 

307 .filter_by(project_id=self.id, active=True, name=name, deleted_at=sa.null(), company_id=ctx.company_id) 

308 .join(db.Integration, db.Integration.id == db.Predictor.integration_id) 

309 .order_by(db.Predictor.name, db.Predictor.id) 

310 .first() 

311 ) 

312 if record is None: 

313 return None 

314 return self._get_model_data(record[0], record[1]) 

315 

316 def get_model_by_id(self, model_id: int): 

317 record = ( 

318 db.session.query(db.Predictor, db.Integration) 

319 .filter_by(project_id=self.id, id=model_id, deleted_at=sa.null(), company_id=ctx.company_id) 

320 .join(db.Integration, db.Integration.id == db.Predictor.integration_id) 

321 .order_by(db.Predictor.name, db.Predictor.id) 

322 .first() 

323 ) 

324 if record is None: 324 ↛ 325line 324 didn't jump to line 325 because the condition on line 324 was never true

325 return None 

326 return self._get_model_data(record[0], record[1]) 

327 

328 def get_models(self, active: bool = True, with_secrets: bool = True): 

329 query = db.session.query(db.Predictor, db.Integration).filter_by( 

330 project_id=self.id, deleted_at=sa.null(), company_id=ctx.company_id 

331 ) 

332 if isinstance(active, bool): 

333 query = query.filter_by(active=active) 

334 

335 query = query.join(db.Integration, db.Integration.id == db.Predictor.integration_id).order_by( 

336 db.Predictor.name, db.Predictor.id 

337 ) 

338 

339 data = [] 

340 

341 for predictor_record, integraion_record in query.all(): 

342 data.append(self._get_model_data(predictor_record, integraion_record, with_secrets)) 

343 

344 return data 

345 

346 def get_agents(self): 

347 records = ( 

348 db.session.query(db.Agents) 

349 .filter( 

350 db.Agents.project_id == self.id, 

351 db.Agents.company_id == ctx.company_id, 

352 db.Agents.deleted_at == sa.null(), 

353 ) 

354 .order_by(db.Agents.name) 

355 .all() 

356 ) 

357 data = [ 

358 { 

359 "name": record.name, 

360 "query": record.query, 

361 "metadata": {"type": "agent", "id": record.id, "deletable": True}, 

362 } 

363 for record in records 

364 ] 

365 return data 

366 

367 def get_knowledge_bases(self): 

368 from mindsdb.api.executor.controllers.session_controller import SessionController 

369 

370 session = SessionController() 

371 

372 return { 

373 kb["name"]: {"type": "knowledge_base", "id": kb["id"], "deletable": True} 

374 for kb in session.kb_controller.list(self.name) 

375 } 

376 

377 def get_views(self): 

378 records = ( 

379 db.session.query(db.View) 

380 .filter_by(project_id=self.id, company_id=ctx.company_id) 

381 .order_by(db.View.name, db.View.id) 

382 .all() 

383 ) 

384 data = [ 

385 { 

386 "name": view_record.name, 

387 "query": view_record.query, 

388 "metadata": {"type": "view", "id": view_record.id, "deletable": True}, 

389 } 

390 for view_record in records 

391 ] 

392 return data 

393 

394 def get_view(self, name: str, strict_case: bool = False) -> dict | None: 

395 """Get a view by name from the current project. 

396 

397 Args: 

398 name (str): The name of the view to retrieve. 

399 strict_case (bool, optional): If True, the view name is case-sensitive. Defaults to False. 

400 

401 Returns: 

402 dict | None: A dictionary with view information if found, otherwise None. 

403 """ 

404 query = db.session.query(db.View).filter( 

405 db.View.project_id == self.id, 

406 db.View.company_id == ctx.company_id, 

407 ) 

408 if strict_case: 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true

409 query = query.filter(db.View.name == name) 

410 else: 

411 query = query.filter(sa.func.lower(db.View.name) == name.lower()) 

412 

413 view_record = query.one_or_none() 

414 

415 if view_record is None: 

416 return None 

417 return { 

418 "name": view_record.name, 

419 "query": view_record.query, 

420 "metadata": {"type": "view", "id": view_record.id, "deletable": True}, 

421 } 

422 

423 @profiler.profile() 

424 def get_tables(self): 

425 data = OrderedDict() 

426 data["models"] = {"type": "table", "deletable": False} 

427 

428 models = self.get_models() 

429 for model in models: 

430 if model["metadata"]["active"] is True: 430 ↛ 429line 430 didn't jump to line 429 because the condition on line 430 was always true

431 data[model["name"]] = model["metadata"] 

432 

433 views = self.get_views() 

434 for view in views: 434 ↛ 435line 434 didn't jump to line 435 because the loop on line 434 never started

435 data[view["name"]] = view["metadata"] 

436 

437 agents = self.get_agents() 

438 for agent in agents: 438 ↛ 439line 438 didn't jump to line 439 because the loop on line 438 never started

439 data[agent["name"]] = agent["metadata"] 

440 

441 data.update(self.get_knowledge_bases()) 

442 

443 return data 

444 

445 def get_columns(self, table_name: str) -> list[str] | None: 

446 columns = [] 

447 tables = self.get_tables() 

448 table = None 

449 for key, value in tables.items(): 

450 if key.lower() == table_name.lower(): 

451 table_name = key 

452 table = value 

453 if table is None: 

454 return columns 

455 

456 match str(table["type"]).upper(): 

457 case "MODEL": 

458 predictor_record = db.Predictor.query.filter_by( 

459 company_id=ctx.company_id, project_id=self.id, name=table_name 

460 ).first() 

461 columns = [] 

462 if predictor_record is not None: 

463 if isinstance(predictor_record.dtype_dict, dict): 

464 columns = list(predictor_record.dtype_dict.keys()) 

465 elif predictor_record.to_predict is not None: 

466 # no dtype_dict, use target 

467 columns = predictor_record.to_predict 

468 if not isinstance(columns, list): 

469 columns = [columns] 

470 case "VIEW": 

471 query = Select(targets=[Star()], from_table=Identifier(table_name), limit=Constant(1)) 

472 

473 from mindsdb.api.executor.controllers.session_controller import SessionController 

474 

475 session = SessionController() 

476 session.database = self.name 

477 df = self.query_view(query, session) 

478 columns = df.columns 

479 case "AGENT": 

480 agent = db.Agents.query.filter_by( 

481 company_id=ctx.company_id, project_id=self.id, name=table_name 

482 ).first() 

483 if agent is not None: 

484 from mindsdb.interfaces.agents.constants import ASSISTANT_COLUMN, USER_COLUMN 

485 

486 columns = [ASSISTANT_COLUMN, USER_COLUMN] 

487 case "KNOWLEDGE_BASE": 

488 columns = ["id", "chunk_id", "chunk_content", "metadata", "relevance", "distance"] 

489 case "TABLE": 

490 # like 'mindsdb.models' 

491 pass 

492 case _: 

493 logger.warning(f"Unknown table type: {table['type']}") 

494 

495 return columns 

496 

497 

498class ProjectController: 

499 def __init__(self): 

500 pass 

501 

502 def get_list(self) -> List[Project]: 

503 company_id = ctx.company_id if ctx.company_id is not None else "0" 

504 records = db.Project.query.filter( 

505 (db.Project.company_id == company_id) & (db.Project.deleted_at == sa.null()) 

506 ).order_by(db.Project.name) 

507 

508 return [Project.from_record(x) for x in records] 

509 

510 def get( 

511 self, 

512 id: int | None = None, 

513 name: str | None = None, 

514 deleted: bool = False, 

515 is_default: bool = False, 

516 strict_case: bool = False, 

517 ) -> Project: 

518 """Get a project by id or name. 

519 

520 Args: 

521 id (int | None, optional): The id of the project to retrieve. Cannot be used with 'name'. 

522 name (str | None, optional): The name of the project to retrieve. Cannot be used with 'id'. 

523 deleted (bool, optional): If True, include deleted projects. Defaults to False. 

524 is_default (bool, optional): If True, only return the default project. Defaults to False. 

525 strict_case (bool, optional): If True, the project name is case-sensitive. Defaults to False. 

526 

527 Raises: 

528 ValueError: If both 'id' and 'name' are provided. 

529 EntityNotExistsError: If the project is not found. 

530 

531 Returns: 

532 Project: The project instance matching the given criteria. 

533 """ 

534 if id is not None and name is not None: 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true

535 raise ValueError("Both 'id' and 'name' can't be provided at the same time") 

536 

537 company_id = ctx.company_id if ctx.company_id is not None else "0" 

538 q = db.Project.query.filter_by(company_id=company_id) 

539 

540 if id is not None: 

541 q = q.filter_by(id=id) 

542 elif name is not None: 542 ↛ 548line 542 didn't jump to line 548 because the condition on line 542 was always true

543 if strict_case: 

544 q = q.filter((db.Project.name == name)) 

545 else: 

546 q = q.filter((sa.func.lower(db.Project.name) == sa.func.lower(name))) 

547 

548 if deleted is True: 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true

549 q = q.filter((db.Project.deleted_at != sa.null())) 

550 else: 

551 q = q.filter_by(deleted_at=sa.null()) 

552 

553 if is_default: 553 ↛ 554line 553 didn't jump to line 554 because the condition on line 553 was never true

554 q = q.filter(db.Project.metadata_["is_default"].as_boolean() == is_default) 

555 

556 record = q.first() 

557 

558 if record is None: 

559 raise EntityNotExistsError(f"Project not found: {name}") 

560 return Project.from_record(record) 

561 

562 def add(self, name: str) -> Project: 

563 project = Project() 

564 project.create(name=name) 

565 return project 

566 

567 def update(self, id: Optional[int] = None, name: Optional[str] = None, new_name: str = None) -> Project: 

568 if id is not None and name is not None: 

569 raise ValueError("Both 'id' and 'name' can't be provided at the same time") 

570 

571 if id is not None: 

572 project = self.get(id=id) 

573 else: 

574 project = self.get(name=name) 

575 

576 if new_name is not None: 

577 project.name = new_name 

578 project.record.name = new_name 

579 

580 db.session.commit() 

581 return project