Coverage for mindsdb / api / executor / datahub / datanodes / information_schema_datanode.py: 66%
103 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from dataclasses import astuple
3import pandas as pd
4from mindsdb_sql_parser.ast.base import ASTNode
6from mindsdb.api.executor.datahub.datanodes.datanode import DataNode
7from mindsdb.api.executor.datahub.datanodes.integration_datanode import IntegrationDataNode
8from mindsdb.api.executor.datahub.datanodes.project_datanode import ProjectDataNode
9from mindsdb.api.executor import exceptions as exc
10from mindsdb.api.executor.utilities.sql import query_df
11from mindsdb.api.executor.utilities.sql import get_query_tables
12from mindsdb.interfaces.database.projects import ProjectController
13from mindsdb.api.executor.datahub.classes.response import DataHubResponse
14from mindsdb.integrations.libs.response import INF_SCHEMA_COLUMNS_NAMES
15from mindsdb.utilities import log
17from .system_tables import (
18 SchemataTable,
19 TablesTable,
20 MetaTablesTable,
21 ColumnsTable,
22 MetaColumnsTable,
23 EventsTable,
24 RoutinesTable,
25 PluginsTable,
26 EnginesTable,
27 MetaTableConstraintsTable,
28 KeyColumnUsageTable,
29 MetaColumnUsageTable,
30 StatisticsTable,
31 MetaColumnStatisticsTable,
32 CharacterSetsTable,
33 CollationsTable,
34 MetaHandlerInfoTable,
35)
36from .mindsdb_tables import (
37 ModelsTable,
38 DatabasesTable,
39 MLEnginesTable,
40 HandlersTable,
41 JobsTable,
42 QueriesTable,
43 ChatbotsTable,
44 KBTable,
45 SkillsTable,
46 AgentsTable,
47 ViewsTable,
48 TriggersTable,
49)
51from mindsdb.api.executor.datahub.classes.tables_row import TablesRow
54logger = log.getLogger(__name__)
57class InformationSchemaDataNode(DataNode):
58 type = "INFORMATION_SCHEMA"
60 tables_list = [
61 SchemataTable,
62 TablesTable,
63 MetaTablesTable,
64 ColumnsTable,
65 MetaColumnsTable,
66 EventsTable,
67 RoutinesTable,
68 PluginsTable,
69 EnginesTable,
70 MetaTableConstraintsTable,
71 KeyColumnUsageTable,
72 MetaColumnUsageTable,
73 StatisticsTable,
74 MetaColumnStatisticsTable,
75 CharacterSetsTable,
76 CollationsTable,
77 ModelsTable,
78 DatabasesTable,
79 MLEnginesTable,
80 HandlersTable,
81 JobsTable,
82 ChatbotsTable,
83 KBTable,
84 SkillsTable,
85 AgentsTable,
86 ViewsTable,
87 TriggersTable,
88 QueriesTable,
89 MetaHandlerInfoTable,
90 ]
92 def __init__(self, session):
93 self.session = session
94 self.integration_controller = session.integration_controller
95 self.project_controller = ProjectController()
96 self.database_controller = session.database_controller
97 self.persist_datanodes_names = ("log", "files")
98 self.tables = {t.name: t for t in self.tables_list}
100 def __getitem__(self, key):
101 return self.get(key)
103 def get(self, name):
104 name_lower = name.lower()
106 if name_lower == "information_schema":
107 return self
109 if name_lower == "log": 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 return self.database_controller.get_system_db("log")
112 if name_lower == "files": 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true
113 return IntegrationDataNode(
114 "files",
115 ds_type="file",
116 integration_controller=self.session.integration_controller,
117 )
119 existing_databases_meta = self.database_controller.get_dict() # filter_type='project'
120 database_name = None
121 for key in existing_databases_meta:
122 if key.lower() == name_lower:
123 database_name = key
124 break
126 if database_name is None:
127 return None
129 database_meta = existing_databases_meta[database_name]
130 if database_meta["type"] == "integration": 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true
131 integration = self.integration_controller.get(name=database_name)
132 return IntegrationDataNode(
133 database_name,
134 ds_type=integration["engine"],
135 integration_controller=self.session.integration_controller,
136 )
137 if database_meta["type"] == "project":
138 project = self.database_controller.get_project(name=database_name)
139 return ProjectDataNode(
140 project=project,
141 integration_controller=self.session.integration_controller,
142 information_schema=self,
143 )
145 integration_names = self.integration_controller.get_all().keys()
146 for integration_name in integration_names: 146 ↛ 155line 146 didn't jump to line 155 because the loop on line 146 didn't complete
147 if integration_name.lower() == name_lower:
148 datasource = self.integration_controller.get(name=integration_name)
149 return IntegrationDataNode(
150 integration_name,
151 ds_type=datasource["engine"],
152 integration_controller=self.session.integration_controller,
153 )
155 return None
157 def get_table_columns_df(self, table_name: str, schema_name: str | None = None) -> pd.DataFrame:
158 """Get a DataFrame containing representation of information_schema.columns for the specified table.
160 Args:
161 table_name (str): The name of the table to get columns from.
162 schema_name (str | None): Not in use. The name of the schema to get columns from.
164 Returns:
165 pd.DataFrame: A DataFrame containing representation of information_schema.columns for the specified table.
166 The DataFrame has list of columns as in the integrations.libs.response.INF_SCHEMA_COLUMNS_NAMES
167 but only 'COLUMN_NAME' column is filled with the actual column names.
168 Other columns are filled with None.
169 """
170 table_name = table_name.upper()
171 if table_name not in self.tables:
172 raise exc.TableNotExistError(f"Table information_schema.{table_name} does not exists")
173 table_columns_names = self.tables[table_name].columns
174 df = pd.DataFrame(pd.Series(table_columns_names, name=INF_SCHEMA_COLUMNS_NAMES.COLUMN_NAME))
175 for column_name in astuple(INF_SCHEMA_COLUMNS_NAMES):
176 if column_name == INF_SCHEMA_COLUMNS_NAMES.COLUMN_NAME:
177 continue
178 df[column_name] = None
179 return df
181 def get_table_columns_names(self, table_name: str, schema_name: str | None = None) -> list[str]:
182 """Get a list of column names for the specified table.
184 Args:
185 table_name (str): The name of the table to get columns from.
186 schema_name (str | None): Not in use. The name of the schema to get columns from.
188 Returns:
189 list[str]: A list of column names for the specified table.
190 """
191 table_name = table_name.upper()
192 if table_name not in self.tables:
193 raise exc.TableNotExistError(f"Table information_schema.{table_name} does not exists")
194 return self.tables[table_name].columns
196 def get_integrations_names(self):
197 integration_names = self.integration_controller.get_all().keys()
198 # remove files from list to prevent doubling in 'select from INFORMATION_SCHEMA.TABLES'
199 return [x.lower() for x in integration_names if x not in ("files",)]
201 def get_projects_names(self):
202 projects = self.database_controller.get_dict(filter_type="project")
203 return [x.lower() for x in projects]
205 def get_tables(self):
206 return [TablesRow(TABLE_NAME=name) for name in self.tables.keys()]
208 def get_tree_tables(self):
209 return {name: table for name, table in self.tables.items() if table.visible}
211 def query(self, query: ASTNode, session=None) -> DataHubResponse:
212 query_tables = [x[1] for x in get_query_tables(query)]
214 if len(query_tables) != 1: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 raise exc.BadTableError(f"Only one table can be used in query to information_schema: {query}")
217 table_name = query_tables[0].upper()
219 if table_name not in self.tables: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true
220 raise exc.NotSupportedYet("Information schema: Not implemented.")
222 tbl = self.tables[table_name]
224 if hasattr(tbl, "get_data"): 224 ↛ 227line 224 didn't jump to line 227 because the condition on line 224 was always true
225 dataframe = tbl.get_data(query=query, inf_schema=self, session=self.session)
226 else:
227 dataframe = self._get_empty_table(tbl)
228 data = query_df(dataframe, query, session=self.session)
230 columns_info = [{"name": k, "type": v} for k, v in data.dtypes.items()]
232 return DataHubResponse(data_frame=data, columns=columns_info, affected_rows=0)
234 def _get_empty_table(self, table):
235 columns = table.columns
236 data = []
238 df = pd.DataFrame(data, columns=columns)
239 return df