Coverage for mindsdb / api / executor / datahub / datanodes / information_schema_datanode.py: 66%

103 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from dataclasses import astuple 

2 

3import pandas as pd 

4from mindsdb_sql_parser.ast.base import ASTNode 

5 

6from mindsdb.api.executor.datahub.datanodes.datanode import DataNode 

7from mindsdb.api.executor.datahub.datanodes.integration_datanode import IntegrationDataNode 

8from mindsdb.api.executor.datahub.datanodes.project_datanode import ProjectDataNode 

9from mindsdb.api.executor import exceptions as exc 

10from mindsdb.api.executor.utilities.sql import query_df 

11from mindsdb.api.executor.utilities.sql import get_query_tables 

12from mindsdb.interfaces.database.projects import ProjectController 

13from mindsdb.api.executor.datahub.classes.response import DataHubResponse 

14from mindsdb.integrations.libs.response import INF_SCHEMA_COLUMNS_NAMES 

15from mindsdb.utilities import log 

16 

17from .system_tables import ( 

18 SchemataTable, 

19 TablesTable, 

20 MetaTablesTable, 

21 ColumnsTable, 

22 MetaColumnsTable, 

23 EventsTable, 

24 RoutinesTable, 

25 PluginsTable, 

26 EnginesTable, 

27 MetaTableConstraintsTable, 

28 KeyColumnUsageTable, 

29 MetaColumnUsageTable, 

30 StatisticsTable, 

31 MetaColumnStatisticsTable, 

32 CharacterSetsTable, 

33 CollationsTable, 

34 MetaHandlerInfoTable, 

35) 

36from .mindsdb_tables import ( 

37 ModelsTable, 

38 DatabasesTable, 

39 MLEnginesTable, 

40 HandlersTable, 

41 JobsTable, 

42 QueriesTable, 

43 ChatbotsTable, 

44 KBTable, 

45 SkillsTable, 

46 AgentsTable, 

47 ViewsTable, 

48 TriggersTable, 

49) 

50 

51from mindsdb.api.executor.datahub.classes.tables_row import TablesRow 

52 

53 

54logger = log.getLogger(__name__) 

55 

56 

57class InformationSchemaDataNode(DataNode): 

58 type = "INFORMATION_SCHEMA" 

59 

60 tables_list = [ 

61 SchemataTable, 

62 TablesTable, 

63 MetaTablesTable, 

64 ColumnsTable, 

65 MetaColumnsTable, 

66 EventsTable, 

67 RoutinesTable, 

68 PluginsTable, 

69 EnginesTable, 

70 MetaTableConstraintsTable, 

71 KeyColumnUsageTable, 

72 MetaColumnUsageTable, 

73 StatisticsTable, 

74 MetaColumnStatisticsTable, 

75 CharacterSetsTable, 

76 CollationsTable, 

77 ModelsTable, 

78 DatabasesTable, 

79 MLEnginesTable, 

80 HandlersTable, 

81 JobsTable, 

82 ChatbotsTable, 

83 KBTable, 

84 SkillsTable, 

85 AgentsTable, 

86 ViewsTable, 

87 TriggersTable, 

88 QueriesTable, 

89 MetaHandlerInfoTable, 

90 ] 

91 

92 def __init__(self, session): 

93 self.session = session 

94 self.integration_controller = session.integration_controller 

95 self.project_controller = ProjectController() 

96 self.database_controller = session.database_controller 

97 self.persist_datanodes_names = ("log", "files") 

98 self.tables = {t.name: t for t in self.tables_list} 

99 

100 def __getitem__(self, key): 

101 return self.get(key) 

102 

103 def get(self, name): 

104 name_lower = name.lower() 

105 

106 if name_lower == "information_schema": 

107 return self 

108 

109 if name_lower == "log": 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 return self.database_controller.get_system_db("log") 

111 

112 if name_lower == "files": 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 return IntegrationDataNode( 

114 "files", 

115 ds_type="file", 

116 integration_controller=self.session.integration_controller, 

117 ) 

118 

119 existing_databases_meta = self.database_controller.get_dict() # filter_type='project' 

120 database_name = None 

121 for key in existing_databases_meta: 

122 if key.lower() == name_lower: 

123 database_name = key 

124 break 

125 

126 if database_name is None: 

127 return None 

128 

129 database_meta = existing_databases_meta[database_name] 

130 if database_meta["type"] == "integration": 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true

131 integration = self.integration_controller.get(name=database_name) 

132 return IntegrationDataNode( 

133 database_name, 

134 ds_type=integration["engine"], 

135 integration_controller=self.session.integration_controller, 

136 ) 

137 if database_meta["type"] == "project": 

138 project = self.database_controller.get_project(name=database_name) 

139 return ProjectDataNode( 

140 project=project, 

141 integration_controller=self.session.integration_controller, 

142 information_schema=self, 

143 ) 

144 

145 integration_names = self.integration_controller.get_all().keys() 

146 for integration_name in integration_names: 146 ↛ 155line 146 didn't jump to line 155 because the loop on line 146 didn't complete

147 if integration_name.lower() == name_lower: 

148 datasource = self.integration_controller.get(name=integration_name) 

149 return IntegrationDataNode( 

150 integration_name, 

151 ds_type=datasource["engine"], 

152 integration_controller=self.session.integration_controller, 

153 ) 

154 

155 return None 

156 

157 def get_table_columns_df(self, table_name: str, schema_name: str | None = None) -> pd.DataFrame: 

158 """Get a DataFrame containing representation of information_schema.columns for the specified table. 

159 

160 Args: 

161 table_name (str): The name of the table to get columns from. 

162 schema_name (str | None): Not in use. The name of the schema to get columns from. 

163 

164 Returns: 

165 pd.DataFrame: A DataFrame containing representation of information_schema.columns for the specified table. 

166 The DataFrame has list of columns as in the integrations.libs.response.INF_SCHEMA_COLUMNS_NAMES 

167 but only 'COLUMN_NAME' column is filled with the actual column names. 

168 Other columns are filled with None. 

169 """ 

170 table_name = table_name.upper() 

171 if table_name not in self.tables: 

172 raise exc.TableNotExistError(f"Table information_schema.{table_name} does not exists") 

173 table_columns_names = self.tables[table_name].columns 

174 df = pd.DataFrame(pd.Series(table_columns_names, name=INF_SCHEMA_COLUMNS_NAMES.COLUMN_NAME)) 

175 for column_name in astuple(INF_SCHEMA_COLUMNS_NAMES): 

176 if column_name == INF_SCHEMA_COLUMNS_NAMES.COLUMN_NAME: 

177 continue 

178 df[column_name] = None 

179 return df 

180 

181 def get_table_columns_names(self, table_name: str, schema_name: str | None = None) -> list[str]: 

182 """Get a list of column names for the specified table. 

183 

184 Args: 

185 table_name (str): The name of the table to get columns from. 

186 schema_name (str | None): Not in use. The name of the schema to get columns from. 

187 

188 Returns: 

189 list[str]: A list of column names for the specified table. 

190 """ 

191 table_name = table_name.upper() 

192 if table_name not in self.tables: 

193 raise exc.TableNotExistError(f"Table information_schema.{table_name} does not exists") 

194 return self.tables[table_name].columns 

195 

196 def get_integrations_names(self): 

197 integration_names = self.integration_controller.get_all().keys() 

198 # remove files from list to prevent doubling in 'select from INFORMATION_SCHEMA.TABLES' 

199 return [x.lower() for x in integration_names if x not in ("files",)] 

200 

201 def get_projects_names(self): 

202 projects = self.database_controller.get_dict(filter_type="project") 

203 return [x.lower() for x in projects] 

204 

205 def get_tables(self): 

206 return [TablesRow(TABLE_NAME=name) for name in self.tables.keys()] 

207 

208 def get_tree_tables(self): 

209 return {name: table for name, table in self.tables.items() if table.visible} 

210 

211 def query(self, query: ASTNode, session=None) -> DataHubResponse: 

212 query_tables = [x[1] for x in get_query_tables(query)] 

213 

214 if len(query_tables) != 1: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true

215 raise exc.BadTableError(f"Only one table can be used in query to information_schema: {query}") 

216 

217 table_name = query_tables[0].upper() 

218 

219 if table_name not in self.tables: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true

220 raise exc.NotSupportedYet("Information schema: Not implemented.") 

221 

222 tbl = self.tables[table_name] 

223 

224 if hasattr(tbl, "get_data"): 224 ↛ 227line 224 didn't jump to line 227 because the condition on line 224 was always true

225 dataframe = tbl.get_data(query=query, inf_schema=self, session=self.session) 

226 else: 

227 dataframe = self._get_empty_table(tbl) 

228 data = query_df(dataframe, query, session=self.session) 

229 

230 columns_info = [{"name": k, "type": v} for k, v in data.dtypes.items()] 

231 

232 return DataHubResponse(data_frame=data, columns=columns_info, affected_rows=0) 

233 

234 def _get_empty_table(self, table): 

235 columns = table.columns 

236 data = [] 

237 

238 df = pd.DataFrame(data, columns=columns) 

239 return df