Coverage for mindsdb / interfaces / database / database.py: 54%
82 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
2from collections import OrderedDict
4from mindsdb.interfaces.database.projects import ProjectController
5import mindsdb.utilities.profiler as profiler
6from mindsdb.utilities.config import config
7from mindsdb.utilities.exception import EntityNotExistsError
8from mindsdb.interfaces.database.log import LogDBController
11class DatabaseController:
12 def __init__(self):
13 from mindsdb.interfaces.database.integrations import integration_controller
15 self.integration_controller = integration_controller
16 self.project_controller = ProjectController()
18 self.logs_db_controller = LogDBController()
19 self.information_schema_controller = None
21 def delete(self, name: str, strict_case: bool = False) -> None:
22 """Delete a database (project or integration) by name.
24 Args:
25 name (str): The name of the database to delete.
26 strict_case (bool, optional): If True, the database name is case-sensitive. Defaults to False.
28 Raises:
29 EntityNotExistsError: If the database does not exist.
30 Exception: If the database cannot be deleted.
32 Returns:
33 None
34 """
35 databases = self.get_dict(lowercase=False)
36 if name not in databases: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true
37 raise EntityNotExistsError("Database does not exists", name)
38 db_type = databases[name]["type"]
39 if db_type == "project": 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true
40 project = self.get_project(name, strict_case)
41 project.delete()
42 return
43 elif db_type == "data":
44 self.integration_controller.delete(name, strict_case)
45 return
46 else:
47 raise Exception(f"Database with type '{db_type}' cannot be deleted")
49 @profiler.profile()
50 def get_list(self, filter_type: Optional[str] = None, with_secrets: Optional[bool] = True):
51 projects = self.project_controller.get_list()
52 integrations = self.integration_controller.get_all(show_secrets=with_secrets)
53 result = [
54 {
55 "name": "information_schema",
56 "type": "system",
57 "id": None,
58 "engine": None,
59 "visible": True,
60 "deletable": False,
61 },
62 {"name": "log", "type": "system", "id": None, "engine": None, "visible": True, "deletable": False},
63 ]
64 for x in projects:
65 result.append(
66 {
67 "name": x.name,
68 "type": "project",
69 "id": x.id,
70 "engine": None,
71 "visible": True,
72 "deletable": x.name.lower() != config.get("default_project"),
73 }
74 )
75 for key, value in integrations.items():
76 db_type = value.get("type", "data")
77 if db_type != "ml":
78 result.append(
79 {
80 "name": key,
81 "type": value.get("type", "data"),
82 "id": value.get("id"),
83 "engine": value.get("engine"),
84 "class_type": value.get("class_type"),
85 "connection_data": value.get("connection_data"),
86 "visible": True,
87 "deletable": value.get("permanent", False) is False,
88 }
89 )
91 if filter_type is not None:
92 result = [x for x in result if x["type"] == filter_type]
94 return result
96 def get_dict(self, filter_type: Optional[str] = None, lowercase: bool = True):
97 return OrderedDict(
98 (x["name"].lower() if lowercase else x["name"], {"type": x["type"], "engine": x["engine"], "id": x["id"]})
99 for x in self.get_list(filter_type=filter_type)
100 )
102 def get_integration(self, integration_id):
103 # get integration by id
105 # TODO get directly from db?
106 for rec in self.get_list(): 106 ↛ exitline 106 didn't return from function 'get_integration' because the loop on line 106 didn't complete
107 if rec["id"] == integration_id and rec["type"] == "data":
108 return {"name": rec["name"], "type": rec["type"], "engine": rec["engine"], "id": rec["id"]}
110 def exists(self, db_name: str) -> bool:
111 return db_name.lower() in self.get_dict()
113 def get_project(self, name: str, strict_case: bool = False):
114 """Get a project by name.
116 Args:
117 name (str): The name of the project to retrieve.
118 strict_case (bool, optional): If True, the project name is case-sensitive. Defaults to False.
120 Returns:
121 Project: The project instance matching the given name.
122 """
123 return self.project_controller.get(name=name, strict_case=strict_case)
125 def get_system_db(self, name: str):
126 if name == "log":
127 return self.logs_db_controller
128 elif name == "information_schema":
129 from mindsdb.api.executor.controllers.session_controller import SessionController
131 session = SessionController()
132 return session.datahub
133 else:
134 raise Exception(f"Database '{name}' does not exists")
136 def update(self, name: str, data: dict, strict_case: bool = False, check_connection: bool = False) -> None:
137 """
138 Updates the database with the given name using the provided data.
140 Parameters:
141 name (str): The name of the database to update.
142 data (dict): The data to update the database with.
143 strict_case (bool): if True, then name is case-sesitive
144 check_connection (bool): if True, check the connection before applying the update
146 Raises:
147 EntityNotExistsError: If the database does not exist.
148 """
149 databases = self.get_dict(lowercase=(not strict_case))
150 if not strict_case:
151 name = name.lower()
152 if name not in databases:
153 raise EntityNotExistsError("Database does not exist.", name)
155 db_type = databases[name]["type"]
156 if db_type == "project":
157 # Only the name of the project can be updated.
158 if {"name"} != set(data):
159 raise ValueError("Only the 'name' field can be updated for projects.")
160 if not data["name"].islower():
161 raise ValueError("New name must be in lower case.")
162 self.project_controller.update(name=name, new_name=str(data["name"]))
163 return
165 elif db_type == "data":
166 # Only the parameters (connection data) of the integration can be updated.
167 if {"parameters"} != set(data):
168 raise ValueError("Only the 'parameters' field can be updated for integrations.")
170 try:
171 self.integration_controller.modify(name, data["parameters"], check_connection=check_connection)
172 except Exception as e:
173 raise Exception(f"Failed to update database: {str(e)}")
175 return
177 else:
178 raise ValueError(f"Database with type '{db_type}' cannot be updated")