Coverage for mindsdb / api / executor / datahub / datanodes / project_datanode.py: 35%

108 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from copy import deepcopy 

2from dataclasses import astuple 

3 

4import pandas as pd 

5from mindsdb_sql_parser.ast.base import ASTNode 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb_sql_parser.ast import ( 

8 BinaryOperation, 

9 Identifier, 

10 Constant, 

11 Update, 

12 Select, 

13 Delete, 

14) 

15 

16from mindsdb.utilities.exception import EntityNotExistsError 

17from mindsdb.api.executor.datahub.datanodes.datanode import DataNode 

18from mindsdb.api.executor.datahub.classes.tables_row import TablesRow 

19from mindsdb.api.executor.datahub.classes.response import DataHubResponse 

20from mindsdb.utilities.partitioning import process_dataframe_in_partitions 

21from mindsdb.integrations.libs.response import INF_SCHEMA_COLUMNS_NAMES 

22 

23 

24class ProjectDataNode(DataNode): 

25 type = "project" 

26 

27 def __init__(self, project, integration_controller, information_schema): 

28 self.project = project 

29 self.integration_controller = integration_controller 

30 self.information_schema = information_schema 

31 

32 def get_type(self): 

33 return self.type 

34 

35 def get_tables(self): 

36 tables = self.project.get_tables() 

37 table_types = { 

38 "table": "BASE TABLE", 

39 "model": "MODEL", 

40 "view": "VIEW", 

41 "agent": "AGENT", 

42 "knowledge_base": "KNOWLEDGE BASE", 

43 } 

44 tables = [{"TABLE_NAME": key, "TABLE_TYPE": table_types.get(val["type"])} for key, val in tables.items()] 

45 result = [TablesRow.from_dict(row) for row in tables] 

46 return result 

47 

48 def get_table_columns_df(self, table_name: str, schema_name: str | None = None) -> pd.DataFrame: 

49 """Get a DataFrame containing representation of information_schema.columns for the specified table. 

50 

51 Args: 

52 table_name (str): The name of the table to get columns from. 

53 schema_name (str | None): Not in use. The name of the schema to get columns from. 

54 

55 Returns: 

56 pd.DataFrame: A DataFrame containing representation of information_schema.columns for the specified table. 

57 The DataFrame has list of columns as in the integrations.libs.response.INF_SCHEMA_COLUMNS_NAMES 

58 but only 'COLUMN_NAME' column is filled with the actual column names. 

59 Other columns are filled with None. 

60 """ 

61 columns = self.project.get_columns(table_name) 

62 

63 data = [] 

64 row = {name: None for name in astuple(INF_SCHEMA_COLUMNS_NAMES)} 

65 for column_name in columns: 

66 r = row.copy() 

67 r[INF_SCHEMA_COLUMNS_NAMES.COLUMN_NAME] = column_name 

68 data.append(r) 

69 

70 return pd.DataFrame(data, columns=astuple(INF_SCHEMA_COLUMNS_NAMES)) 

71 

72 def get_table_columns_names(self, table_name: str, schema_name: str | None = None) -> list[str]: 

73 """Get a list of column names for the specified table. 

74 

75 Args: 

76 table_name (str): The name of the table to get columns from. 

77 schema_name (str | None): Not in use. The name of the schema to get columns from. 

78 

79 Returns: 

80 list[str]: A list of column names for the specified table. 

81 """ 

82 return self.project.get_columns(table_name) 

83 

84 def predict(self, model_name: str, df, version=None, params=None): 

85 model_metadata = self.project.get_model(model_name) 

86 if model_metadata is None: 

87 raise Exception(f"Can't find model '{model_name}'") 

88 model_metadata = model_metadata["metadata"] 

89 if model_metadata["update_status"] == "available": 

90 raise Exception(f"model '{model_name}' is obsolete and needs to be updated. Run 'RETRAIN {model_name};'") 

91 ml_handler = self.integration_controller.get_ml_handler(model_metadata["engine_name"]) 

92 if params is not None and "partition_size" in params: 

93 

94 def callback(chunk): 

95 return ml_handler.predict( 

96 model_name, chunk, project_name=self.project.name, version=version, params=params 

97 ) 

98 

99 return pd.concat(process_dataframe_in_partitions(df, callback, params["partition_size"])) 

100 

101 return ml_handler.predict(model_name, df, project_name=self.project.name, version=version, params=params) 

102 

103 def query(self, query: ASTNode | str = None, session=None) -> DataHubResponse: 

104 if isinstance(query, str): 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true

105 query = parse_sql(query) 

106 

107 if isinstance(query, Update): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true

108 query_table = query.table.parts[0].lower() 

109 kb_table = session.kb_controller.get_table(query_table, self.project.id) 

110 if kb_table: 

111 # this is the knowledge db 

112 kb_table.update_query(query) 

113 return DataHubResponse() 

114 

115 raise NotImplementedError(f"Can't update object: {query_table}") 

116 

117 elif isinstance(query, Delete): 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 query_table = query.table.parts[0].lower() 

119 kb_table = session.kb_controller.get_table(query_table, self.project.id) 

120 if kb_table: 

121 # this is the knowledge db 

122 kb_table.delete_query(query) 

123 return DataHubResponse() 

124 

125 raise NotImplementedError(f"Can't delete object: {query_table}") 

126 

127 elif isinstance(query, Select): 127 ↛ 174line 127 didn't jump to line 174 because the condition on line 127 was always true

128 match query.from_table.parts, query.from_table.is_quoted: 

129 case [query_table], [is_quoted]: 

130 ... 

131 case [query_table, int(_)], [is_quoted, _]: 

132 ... 

133 case [query_table, str(version)], [is_quoted, _] if version.isdigit(): 

134 ... 

135 case _: 

136 raise EntityNotExistsError( 

137 f"Table '{query.from_table}' not found in the database. The project database support only single-part names", 

138 self.project.name, 

139 ) 

140 

141 if not is_quoted: 141 ↛ 145line 141 didn't jump to line 145 because the condition on line 141 was always true

142 query_table = query_table.lower() 

143 

144 # region is it query to 'models'? 

145 if query_table in ("models", "jobs", "mdb_triggers", "chatbots", "skills", "agents"): 145 ↛ 156line 145 didn't jump to line 156 because the condition on line 145 was always true

146 new_query = deepcopy(query) 

147 project_filter = BinaryOperation("=", args=[Identifier("project"), Constant(self.project.name)]) 

148 if new_query.where is None: 148 ↛ 151line 148 didn't jump to line 151 because the condition on line 148 was always true

149 new_query.where = project_filter 

150 else: 

151 new_query.where = BinaryOperation("and", args=[new_query.where, project_filter]) 

152 return self.information_schema.query(new_query) 

153 # endregion 

154 

155 # other table from project 

156 if self.project.get_view(query_table, strict_case=is_quoted): 

157 # this is the view 

158 df = self.project.query_view(query, session) 

159 

160 columns_info = [{"name": k, "type": v} for k, v in df.dtypes.items()] 

161 

162 return DataHubResponse(data_frame=df, columns=columns_info) 

163 

164 kb_table = session.kb_controller.get_table(query_table, self.project.id) 

165 if kb_table: 

166 # this is the knowledge db 

167 df = kb_table.select_query(query) 

168 columns_info = [{"name": k, "type": v} for k, v in df.dtypes.items()] 

169 

170 return DataHubResponse(data_frame=df, columns=columns_info) 

171 

172 raise EntityNotExistsError(f"Table '{query_table}' not found in database", self.project.name) 

173 else: 

174 raise NotImplementedError(f"Query not supported {query}") 

175 

176 def create_table( 

177 self, table_name: Identifier, result_set=None, is_replace=False, params=None, is_create=None, **kwargs 

178 ) -> DataHubResponse: 

179 # is_create - create table 

180 # is_replace - drop table if exists 

181 # is_create==False and is_replace==False: just insert 

182 

183 from mindsdb.api.executor.controllers.session_controller import SessionController 

184 

185 session = SessionController() 

186 

187 if is_create: 

188 raise NotImplementedError(f"Can't create table {table_name}") 

189 

190 table_name = table_name.parts[-1] 

191 kb_table = session.kb_controller.get_table(table_name, self.project.id) 

192 if kb_table: 

193 # this is the knowledge db 

194 if is_replace: 

195 kb_table.clear() 

196 

197 df = result_set.to_df() 

198 kb_table.insert(df, params=params) 

199 return DataHubResponse() 

200 

201 raise ValueError(f"Table or Knowledge Base '{table_name}' doesn't exist")