Coverage for mindsdb / interfaces / database / database.py: 54%

82 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2from collections import OrderedDict 

3 

4from mindsdb.interfaces.database.projects import ProjectController 

5import mindsdb.utilities.profiler as profiler 

6from mindsdb.utilities.config import config 

7from mindsdb.utilities.exception import EntityNotExistsError 

8from mindsdb.interfaces.database.log import LogDBController 

9 

10 

11class DatabaseController: 

12 def __init__(self): 

13 from mindsdb.interfaces.database.integrations import integration_controller 

14 

15 self.integration_controller = integration_controller 

16 self.project_controller = ProjectController() 

17 

18 self.logs_db_controller = LogDBController() 

19 self.information_schema_controller = None 

20 

21 def delete(self, name: str, strict_case: bool = False) -> None: 

22 """Delete a database (project or integration) by name. 

23 

24 Args: 

25 name (str): The name of the database to delete. 

26 strict_case (bool, optional): If True, the database name is case-sensitive. Defaults to False. 

27 

28 Raises: 

29 EntityNotExistsError: If the database does not exist. 

30 Exception: If the database cannot be deleted. 

31 

32 Returns: 

33 None 

34 """ 

35 databases = self.get_dict(lowercase=False) 

36 if name not in databases: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true

37 raise EntityNotExistsError("Database does not exists", name) 

38 db_type = databases[name]["type"] 

39 if db_type == "project": 39 ↛ 40line 39 didn't jump to line 40 because the condition on line 39 was never true

40 project = self.get_project(name, strict_case) 

41 project.delete() 

42 return 

43 elif db_type == "data": 

44 self.integration_controller.delete(name, strict_case) 

45 return 

46 else: 

47 raise Exception(f"Database with type '{db_type}' cannot be deleted") 

48 

49 @profiler.profile() 

50 def get_list(self, filter_type: Optional[str] = None, with_secrets: Optional[bool] = True): 

51 projects = self.project_controller.get_list() 

52 integrations = self.integration_controller.get_all(show_secrets=with_secrets) 

53 result = [ 

54 { 

55 "name": "information_schema", 

56 "type": "system", 

57 "id": None, 

58 "engine": None, 

59 "visible": True, 

60 "deletable": False, 

61 }, 

62 {"name": "log", "type": "system", "id": None, "engine": None, "visible": True, "deletable": False}, 

63 ] 

64 for x in projects: 

65 result.append( 

66 { 

67 "name": x.name, 

68 "type": "project", 

69 "id": x.id, 

70 "engine": None, 

71 "visible": True, 

72 "deletable": x.name.lower() != config.get("default_project"), 

73 } 

74 ) 

75 for key, value in integrations.items(): 

76 db_type = value.get("type", "data") 

77 if db_type != "ml": 

78 result.append( 

79 { 

80 "name": key, 

81 "type": value.get("type", "data"), 

82 "id": value.get("id"), 

83 "engine": value.get("engine"), 

84 "class_type": value.get("class_type"), 

85 "connection_data": value.get("connection_data"), 

86 "visible": True, 

87 "deletable": value.get("permanent", False) is False, 

88 } 

89 ) 

90 

91 if filter_type is not None: 

92 result = [x for x in result if x["type"] == filter_type] 

93 

94 return result 

95 

96 def get_dict(self, filter_type: Optional[str] = None, lowercase: bool = True): 

97 return OrderedDict( 

98 (x["name"].lower() if lowercase else x["name"], {"type": x["type"], "engine": x["engine"], "id": x["id"]}) 

99 for x in self.get_list(filter_type=filter_type) 

100 ) 

101 

102 def get_integration(self, integration_id): 

103 # get integration by id 

104 

105 # TODO get directly from db? 

106 for rec in self.get_list(): 106 ↛ exitline 106 didn't return from function 'get_integration' because the loop on line 106 didn't complete

107 if rec["id"] == integration_id and rec["type"] == "data": 

108 return {"name": rec["name"], "type": rec["type"], "engine": rec["engine"], "id": rec["id"]} 

109 

110 def exists(self, db_name: str) -> bool: 

111 return db_name.lower() in self.get_dict() 

112 

113 def get_project(self, name: str, strict_case: bool = False): 

114 """Get a project by name. 

115 

116 Args: 

117 name (str): The name of the project to retrieve. 

118 strict_case (bool, optional): If True, the project name is case-sensitive. Defaults to False. 

119 

120 Returns: 

121 Project: The project instance matching the given name. 

122 """ 

123 return self.project_controller.get(name=name, strict_case=strict_case) 

124 

125 def get_system_db(self, name: str): 

126 if name == "log": 

127 return self.logs_db_controller 

128 elif name == "information_schema": 

129 from mindsdb.api.executor.controllers.session_controller import SessionController 

130 

131 session = SessionController() 

132 return session.datahub 

133 else: 

134 raise Exception(f"Database '{name}' does not exists") 

135 

136 def update(self, name: str, data: dict, strict_case: bool = False, check_connection: bool = False) -> None: 

137 """ 

138 Updates the database with the given name using the provided data. 

139 

140 Parameters: 

141 name (str): The name of the database to update. 

142 data (dict): The data to update the database with. 

143 strict_case (bool): if True, then name is case-sesitive 

144 check_connection (bool): if True, check the connection before applying the update 

145 

146 Raises: 

147 EntityNotExistsError: If the database does not exist. 

148 """ 

149 databases = self.get_dict(lowercase=(not strict_case)) 

150 if not strict_case: 

151 name = name.lower() 

152 if name not in databases: 

153 raise EntityNotExistsError("Database does not exist.", name) 

154 

155 db_type = databases[name]["type"] 

156 if db_type == "project": 

157 # Only the name of the project can be updated. 

158 if {"name"} != set(data): 

159 raise ValueError("Only the 'name' field can be updated for projects.") 

160 if not data["name"].islower(): 

161 raise ValueError("New name must be in lower case.") 

162 self.project_controller.update(name=name, new_name=str(data["name"])) 

163 return 

164 

165 elif db_type == "data": 

166 # Only the parameters (connection data) of the integration can be updated. 

167 if {"parameters"} != set(data): 

168 raise ValueError("Only the 'parameters' field can be updated for integrations.") 

169 

170 try: 

171 self.integration_controller.modify(name, data["parameters"], check_connection=check_connection) 

172 except Exception as e: 

173 raise Exception(f"Failed to update database: {str(e)}") 

174 

175 return 

176 

177 else: 

178 raise ValueError(f"Database with type '{db_type}' cannot be updated")