Coverage for mindsdb / interfaces / database / projects.py: 38%
312 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import datetime
2from copy import deepcopy
3from typing import List, Optional
4from collections import OrderedDict
6import pandas as pd
7import sqlalchemy as sa
8import numpy as np
10from mindsdb_sql_parser.ast.base import ASTNode
11from mindsdb_sql_parser.ast import Select, Star, Constant, Identifier, BinaryOperation
12from mindsdb_sql_parser import parse_sql
14from mindsdb.interfaces.storage import db
15from mindsdb.interfaces.model.model_controller import ModelController
16from mindsdb.interfaces.database.views import ViewController
17from mindsdb.utilities import log
18from mindsdb.utilities.config import Config
19from mindsdb.utilities.context import context as ctx
20from mindsdb.utilities.exception import EntityExistsError, EntityNotExistsError
21import mindsdb.utilities.profiler as profiler
22from mindsdb.api.executor.sql_query import SQLQuery
23from mindsdb.api.executor.utilities.sql import query_df
24from mindsdb.interfaces.query_context.context_controller import query_context_controller
26logger = log.getLogger(__name__)
29class Project:
30 @staticmethod
31 def from_record(db_record: db.Project):
32 p = Project()
33 p.record = db_record
34 p.name = db_record.name
35 p.company_id = ctx.company_id
36 p.id = db_record.id
37 p.metadata = db_record.metadata_
38 return p
40 def create(self, name: str):
41 company_id = ctx.company_id if ctx.company_id is not None else "0"
43 existing_record = db.Integration.query.filter(
44 db.Integration.name == name, db.Integration.company_id == ctx.company_id
45 ).first()
46 if existing_record is not None:
47 raise EntityExistsError("Database exists with this name ", name)
49 existing_record = db.Project.query.filter(
50 (db.Project.name == name) & (db.Project.company_id == company_id) & (db.Project.deleted_at == sa.null())
51 ).first()
52 if existing_record is not None:
53 raise EntityExistsError("Project already exists", name)
55 record = db.Project(name=name, company_id=company_id)
57 self.record = record
58 self.name = name
59 self.company_id = company_id
61 db.session.add(record)
62 db.session.commit()
64 self.id = record.id
66 def delete(self):
67 if self.record.metadata_ and self.record.metadata_.get("is_default", False):
68 raise Exception(
69 f"Project '{self.name}' can not be deleted, because it is default project."
70 "The default project can be changed in the config file or by setting the environment variable MINDSDB_DEFAULT_PROJECT."
71 )
73 tables = self.get_tables()
74 tables = [key for key, val in tables.items() if val["type"] != "table"]
75 if len(tables) > 0:
76 raise Exception(
77 f"Project '{self.name}' can not be deleted, because it contains tables: {', '.join(tables)}"
78 )
80 is_cloud = Config().get("cloud", False)
81 if is_cloud is True:
82 self.record.deleted_at = datetime.datetime.now()
83 else:
84 db.session.delete(self.record)
85 self.record = None
86 self.name = None
87 self.company_id = None
88 self.id = None
89 db.session.commit()
91 def drop_model(self, name: str):
92 ModelController().delete_model(name, project_name=self.name)
94 def drop_view(self, name: str, strict_case: bool = False) -> None:
95 """Remove a view with the specified name from the current project.
97 Args:
98 name (str): The name of the view to remove.
99 strict_case (bool, optional): If True, the view name is case-sensitive. Defaults to False.
101 Raises:
102 EntityNotExistsError: If the view does not exist.
104 Returns:
105 None
106 """
107 ViewController().delete(name, project_name=self.name, strict_case=strict_case)
109 def create_view(self, name: str, query: str, session):
110 ast_query = parse_sql(query)
112 if isinstance(ast_query, Select): 112 ↛ 122line 112 didn't jump to line 122 because the condition on line 112 was always true
113 # check create view sql
114 ast_query.limit = Constant(1)
116 query_context_controller.set_context(query_context_controller.IGNORE_CONTEXT)
117 try:
118 SQLQuery(ast_query, session=session, database=self.name)
119 finally:
120 query_context_controller.release_context(query_context_controller.IGNORE_CONTEXT)
122 ViewController().add(name, query=query, project_name=self.name)
124 def update_view(self, name: str, query: str, strict_case: bool = False):
125 ViewController().update(name, query=query, project_name=self.name, strict_case=strict_case)
127 def delete_view(self, name: str):
128 ViewController().delete(name, project_name=self.name)
130 def get_view_meta(self, query: ASTNode) -> ASTNode:
131 view_name = query.from_table.parts[-1]
132 view_meta = ViewController().get(name=view_name, project_name=self.name)
133 view_meta["query_ast"] = parse_sql(view_meta["query"])
134 return view_meta
136 @staticmethod
137 def combine_view_select(view_query: Select, query: Select) -> Select:
138 """
139 Create a combined query from view's query and outer query.
140 """
142 # apply optimizations
143 if query.where is not None:
144 # Get conditions that can be duplicated into view's query
145 # It has to be simple condition with identifier and constant
146 # Also it shouldn't be under the OR condition
148 def get_conditions_to_move(node):
149 if not isinstance(node, BinaryOperation):
150 return []
151 op = node.op.upper()
152 if op == "AND":
153 conditions = []
154 conditions.extend(get_conditions_to_move(node.args[0]))
155 conditions.extend(get_conditions_to_move(node.args[1]))
156 return conditions
158 if op == "OR":
159 return []
160 if isinstance(node.args[0], (Identifier, Constant)) and isinstance(
161 node.args[1], (Identifier, Constant)
162 ):
163 return [node]
165 conditions = get_conditions_to_move(query.where)
167 if conditions:
168 # analyse targets
169 # if target element has alias
170 # if element is not identifier or the name is not equal to alias:
171 # add alias to black list
172 # white list:
173 # all targets that are identifiers with no alias or equal to its alias
174 # condition can be moved if
175 # column is not in black list AND (query has star(*) OR column in white list)
177 has_star = False
178 white_list, black_list = [], []
179 for target in view_query.targets:
180 if isinstance(target, Star):
181 has_star = True
182 if isinstance(target, Identifier):
183 name = target.parts[-1].lower()
184 if target.alias is None or target.alias.parts[-1].lower() == name:
185 white_list.append(name)
186 elif target.alias is not None:
187 black_list.append(target.alias.parts[-1].lower())
189 view_where = view_query.where
190 for condition in conditions:
191 arg1, arg2 = condition.args
193 if isinstance(arg1, Identifier):
194 name = arg1.parts[-1].lower()
195 if name in black_list or not (has_star or name in white_list):
196 continue
197 if isinstance(arg2, Identifier):
198 name = arg2.parts[-1].lower()
199 if name in black_list or not (has_star or name in white_list):
200 continue
202 # condition can be moved into view
203 condition2 = BinaryOperation(condition.op, [arg1, arg2])
204 if view_where is None:
205 view_where = condition2
206 else:
207 view_where = BinaryOperation("AND", args=[view_where, condition2])
209 # disable outer condition
210 condition.op = "="
211 condition.args = [Constant(0), Constant(0)]
213 view_query.where = view_where
215 # combine outer query with view's query
216 view_query.parentheses = True
217 query.from_table = view_query
218 return query
220 def query_view(self, query: Select, session) -> pd.DataFrame:
221 view_meta = self.get_view_meta(query)
223 query_context_controller.set_context("view", view_meta["id"])
224 query_applied = False
225 try:
226 view_query = view_meta["query_ast"]
227 if isinstance(view_query, Select):
228 view_query = self.combine_view_select(view_query, query)
229 query_applied = True
231 sqlquery = SQLQuery(view_query, session=session)
232 df = sqlquery.fetched_data.to_df()
233 finally:
234 query_context_controller.release_context("view", view_meta["id"])
236 # remove duplicated columns
237 df = df.loc[:, ~df.columns.duplicated()]
238 if query_applied:
239 return df
240 else:
241 return query_df(df, query, session=session)
243 @staticmethod
244 def _get_model_data(predictor_record, integraion_record, with_secrets: bool = True):
245 from mindsdb.interfaces.database.integrations import integration_controller
247 predictor_data = predictor_record.data or {}
248 training_time = None
249 if ( 249 ↛ 254line 249 didn't jump to line 254 because the condition on line 249 was never true
250 predictor_record.training_start_at is not None
251 and predictor_record.training_stop_at is None
252 and predictor_record.status != "error"
253 ):
254 training_time = round((datetime.datetime.now() - predictor_record.training_start_at).total_seconds(), 3)
255 elif predictor_record.training_start_at is not None and predictor_record.training_stop_at is not None: 255 ↛ 261line 255 didn't jump to line 261 because the condition on line 255 was always true
256 training_time = round(
257 (predictor_record.training_stop_at - predictor_record.training_start_at).total_seconds(), 3
258 )
260 # regon Hide sensitive info
261 training_options = predictor_record.learn_args
262 handler_module = integration_controller.get_handler_module(integraion_record.engine)
264 if with_secrets is False and handler_module: 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true
265 model_using_args = getattr(handler_module, "model_using_args", None)
266 if (
267 isinstance(model_using_args, dict)
268 and isinstance(training_options, dict)
269 and isinstance(training_options.get("using"), dict)
270 ):
271 training_options["using"] = deepcopy(training_options["using"])
272 for key, value in model_using_args.items():
273 if key in training_options["using"] and value.get("secret", False):
274 training_options["using"][key] = "******"
275 # endregion
277 predictor_meta = {
278 "type": "model",
279 "id": predictor_record.id,
280 "engine": integraion_record.engine,
281 "engine_name": integraion_record.name,
282 "active": predictor_record.active,
283 "version": predictor_record.version,
284 "status": predictor_record.status,
285 "accuracy": None,
286 "predict": predictor_record.to_predict[0],
287 "update_status": predictor_record.update_status,
288 "mindsdb_version": predictor_record.mindsdb_version,
289 "error": predictor_data.get("error"),
290 "select_data_query": predictor_record.fetch_data_query,
291 "training_options": training_options,
292 "deletable": True,
293 "label": predictor_record.label,
294 "current_training_phase": predictor_record.training_phase_current,
295 "total_training_phases": predictor_record.training_phase_total,
296 "training_phase_name": predictor_record.training_phase_name,
297 "training_time": training_time,
298 }
299 if predictor_data.get("accuracies", None) is not None: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true
300 if len(predictor_data["accuracies"]) > 0:
301 predictor_meta["accuracy"] = float(np.mean(list(predictor_data["accuracies"].values())))
302 return {"name": predictor_record.name, "metadata": predictor_meta, "created_at": predictor_record.created_at}
304 def get_model(self, name: str):
305 record = (
306 db.session.query(db.Predictor, db.Integration)
307 .filter_by(project_id=self.id, active=True, name=name, deleted_at=sa.null(), company_id=ctx.company_id)
308 .join(db.Integration, db.Integration.id == db.Predictor.integration_id)
309 .order_by(db.Predictor.name, db.Predictor.id)
310 .first()
311 )
312 if record is None:
313 return None
314 return self._get_model_data(record[0], record[1])
316 def get_model_by_id(self, model_id: int):
317 record = (
318 db.session.query(db.Predictor, db.Integration)
319 .filter_by(project_id=self.id, id=model_id, deleted_at=sa.null(), company_id=ctx.company_id)
320 .join(db.Integration, db.Integration.id == db.Predictor.integration_id)
321 .order_by(db.Predictor.name, db.Predictor.id)
322 .first()
323 )
324 if record is None: 324 ↛ 325line 324 didn't jump to line 325 because the condition on line 324 was never true
325 return None
326 return self._get_model_data(record[0], record[1])
328 def get_models(self, active: bool = True, with_secrets: bool = True):
329 query = db.session.query(db.Predictor, db.Integration).filter_by(
330 project_id=self.id, deleted_at=sa.null(), company_id=ctx.company_id
331 )
332 if isinstance(active, bool):
333 query = query.filter_by(active=active)
335 query = query.join(db.Integration, db.Integration.id == db.Predictor.integration_id).order_by(
336 db.Predictor.name, db.Predictor.id
337 )
339 data = []
341 for predictor_record, integraion_record in query.all():
342 data.append(self._get_model_data(predictor_record, integraion_record, with_secrets))
344 return data
346 def get_agents(self):
347 records = (
348 db.session.query(db.Agents)
349 .filter(
350 db.Agents.project_id == self.id,
351 db.Agents.company_id == ctx.company_id,
352 db.Agents.deleted_at == sa.null(),
353 )
354 .order_by(db.Agents.name)
355 .all()
356 )
357 data = [
358 {
359 "name": record.name,
360 "query": record.query,
361 "metadata": {"type": "agent", "id": record.id, "deletable": True},
362 }
363 for record in records
364 ]
365 return data
367 def get_knowledge_bases(self):
368 from mindsdb.api.executor.controllers.session_controller import SessionController
370 session = SessionController()
372 return {
373 kb["name"]: {"type": "knowledge_base", "id": kb["id"], "deletable": True}
374 for kb in session.kb_controller.list(self.name)
375 }
377 def get_views(self):
378 records = (
379 db.session.query(db.View)
380 .filter_by(project_id=self.id, company_id=ctx.company_id)
381 .order_by(db.View.name, db.View.id)
382 .all()
383 )
384 data = [
385 {
386 "name": view_record.name,
387 "query": view_record.query,
388 "metadata": {"type": "view", "id": view_record.id, "deletable": True},
389 }
390 for view_record in records
391 ]
392 return data
394 def get_view(self, name: str, strict_case: bool = False) -> dict | None:
395 """Get a view by name from the current project.
397 Args:
398 name (str): The name of the view to retrieve.
399 strict_case (bool, optional): If True, the view name is case-sensitive. Defaults to False.
401 Returns:
402 dict | None: A dictionary with view information if found, otherwise None.
403 """
404 query = db.session.query(db.View).filter(
405 db.View.project_id == self.id,
406 db.View.company_id == ctx.company_id,
407 )
408 if strict_case: 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true
409 query = query.filter(db.View.name == name)
410 else:
411 query = query.filter(sa.func.lower(db.View.name) == name.lower())
413 view_record = query.one_or_none()
415 if view_record is None:
416 return None
417 return {
418 "name": view_record.name,
419 "query": view_record.query,
420 "metadata": {"type": "view", "id": view_record.id, "deletable": True},
421 }
423 @profiler.profile()
424 def get_tables(self):
425 data = OrderedDict()
426 data["models"] = {"type": "table", "deletable": False}
428 models = self.get_models()
429 for model in models:
430 if model["metadata"]["active"] is True: 430 ↛ 429line 430 didn't jump to line 429 because the condition on line 430 was always true
431 data[model["name"]] = model["metadata"]
433 views = self.get_views()
434 for view in views: 434 ↛ 435line 434 didn't jump to line 435 because the loop on line 434 never started
435 data[view["name"]] = view["metadata"]
437 agents = self.get_agents()
438 for agent in agents: 438 ↛ 439line 438 didn't jump to line 439 because the loop on line 438 never started
439 data[agent["name"]] = agent["metadata"]
441 data.update(self.get_knowledge_bases())
443 return data
445 def get_columns(self, table_name: str) -> list[str] | None:
446 columns = []
447 tables = self.get_tables()
448 table = None
449 for key, value in tables.items():
450 if key.lower() == table_name.lower():
451 table_name = key
452 table = value
453 if table is None:
454 return columns
456 match str(table["type"]).upper():
457 case "MODEL":
458 predictor_record = db.Predictor.query.filter_by(
459 company_id=ctx.company_id, project_id=self.id, name=table_name
460 ).first()
461 columns = []
462 if predictor_record is not None:
463 if isinstance(predictor_record.dtype_dict, dict):
464 columns = list(predictor_record.dtype_dict.keys())
465 elif predictor_record.to_predict is not None:
466 # no dtype_dict, use target
467 columns = predictor_record.to_predict
468 if not isinstance(columns, list):
469 columns = [columns]
470 case "VIEW":
471 query = Select(targets=[Star()], from_table=Identifier(table_name), limit=Constant(1))
473 from mindsdb.api.executor.controllers.session_controller import SessionController
475 session = SessionController()
476 session.database = self.name
477 df = self.query_view(query, session)
478 columns = df.columns
479 case "AGENT":
480 agent = db.Agents.query.filter_by(
481 company_id=ctx.company_id, project_id=self.id, name=table_name
482 ).first()
483 if agent is not None:
484 from mindsdb.interfaces.agents.constants import ASSISTANT_COLUMN, USER_COLUMN
486 columns = [ASSISTANT_COLUMN, USER_COLUMN]
487 case "KNOWLEDGE_BASE":
488 columns = ["id", "chunk_id", "chunk_content", "metadata", "relevance", "distance"]
489 case "TABLE":
490 # like 'mindsdb.models'
491 pass
492 case _:
493 logger.warning(f"Unknown table type: {table['type']}")
495 return columns
498class ProjectController:
499 def __init__(self):
500 pass
502 def get_list(self) -> List[Project]:
503 company_id = ctx.company_id if ctx.company_id is not None else "0"
504 records = db.Project.query.filter(
505 (db.Project.company_id == company_id) & (db.Project.deleted_at == sa.null())
506 ).order_by(db.Project.name)
508 return [Project.from_record(x) for x in records]
510 def get(
511 self,
512 id: int | None = None,
513 name: str | None = None,
514 deleted: bool = False,
515 is_default: bool = False,
516 strict_case: bool = False,
517 ) -> Project:
518 """Get a project by id or name.
520 Args:
521 id (int | None, optional): The id of the project to retrieve. Cannot be used with 'name'.
522 name (str | None, optional): The name of the project to retrieve. Cannot be used with 'id'.
523 deleted (bool, optional): If True, include deleted projects. Defaults to False.
524 is_default (bool, optional): If True, only return the default project. Defaults to False.
525 strict_case (bool, optional): If True, the project name is case-sensitive. Defaults to False.
527 Raises:
528 ValueError: If both 'id' and 'name' are provided.
529 EntityNotExistsError: If the project is not found.
531 Returns:
532 Project: The project instance matching the given criteria.
533 """
534 if id is not None and name is not None: 534 ↛ 535line 534 didn't jump to line 535 because the condition on line 534 was never true
535 raise ValueError("Both 'id' and 'name' can't be provided at the same time")
537 company_id = ctx.company_id if ctx.company_id is not None else "0"
538 q = db.Project.query.filter_by(company_id=company_id)
540 if id is not None:
541 q = q.filter_by(id=id)
542 elif name is not None: 542 ↛ 548line 542 didn't jump to line 548 because the condition on line 542 was always true
543 if strict_case:
544 q = q.filter((db.Project.name == name))
545 else:
546 q = q.filter((sa.func.lower(db.Project.name) == sa.func.lower(name)))
548 if deleted is True: 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true
549 q = q.filter((db.Project.deleted_at != sa.null()))
550 else:
551 q = q.filter_by(deleted_at=sa.null())
553 if is_default: 553 ↛ 554line 553 didn't jump to line 554 because the condition on line 553 was never true
554 q = q.filter(db.Project.metadata_["is_default"].as_boolean() == is_default)
556 record = q.first()
558 if record is None:
559 raise EntityNotExistsError(f"Project not found: {name}")
560 return Project.from_record(record)
562 def add(self, name: str) -> Project:
563 project = Project()
564 project.create(name=name)
565 return project
567 def update(self, id: Optional[int] = None, name: Optional[str] = None, new_name: str = None) -> Project:
568 if id is not None and name is not None:
569 raise ValueError("Both 'id' and 'name' can't be provided at the same time")
571 if id is not None:
572 project = self.get(id=id)
573 else:
574 project = self.get(name=name)
576 if new_name is not None:
577 project.name = new_name
578 project.record.name = new_name
580 db.session.commit()
581 return project