Coverage for mindsdb / api / executor / datahub / datanodes / project_datanode.py: 35%
108 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from copy import deepcopy
2from dataclasses import astuple
4import pandas as pd
5from mindsdb_sql_parser.ast.base import ASTNode
6from mindsdb_sql_parser import parse_sql
7from mindsdb_sql_parser.ast import (
8 BinaryOperation,
9 Identifier,
10 Constant,
11 Update,
12 Select,
13 Delete,
14)
16from mindsdb.utilities.exception import EntityNotExistsError
17from mindsdb.api.executor.datahub.datanodes.datanode import DataNode
18from mindsdb.api.executor.datahub.classes.tables_row import TablesRow
19from mindsdb.api.executor.datahub.classes.response import DataHubResponse
20from mindsdb.utilities.partitioning import process_dataframe_in_partitions
21from mindsdb.integrations.libs.response import INF_SCHEMA_COLUMNS_NAMES
24class ProjectDataNode(DataNode):
25 type = "project"
27 def __init__(self, project, integration_controller, information_schema):
28 self.project = project
29 self.integration_controller = integration_controller
30 self.information_schema = information_schema
32 def get_type(self):
33 return self.type
35 def get_tables(self):
36 tables = self.project.get_tables()
37 table_types = {
38 "table": "BASE TABLE",
39 "model": "MODEL",
40 "view": "VIEW",
41 "agent": "AGENT",
42 "knowledge_base": "KNOWLEDGE BASE",
43 }
44 tables = [{"TABLE_NAME": key, "TABLE_TYPE": table_types.get(val["type"])} for key, val in tables.items()]
45 result = [TablesRow.from_dict(row) for row in tables]
46 return result
48 def get_table_columns_df(self, table_name: str, schema_name: str | None = None) -> pd.DataFrame:
49 """Get a DataFrame containing representation of information_schema.columns for the specified table.
51 Args:
52 table_name (str): The name of the table to get columns from.
53 schema_name (str | None): Not in use. The name of the schema to get columns from.
55 Returns:
56 pd.DataFrame: A DataFrame containing representation of information_schema.columns for the specified table.
57 The DataFrame has list of columns as in the integrations.libs.response.INF_SCHEMA_COLUMNS_NAMES
58 but only 'COLUMN_NAME' column is filled with the actual column names.
59 Other columns are filled with None.
60 """
61 columns = self.project.get_columns(table_name)
63 data = []
64 row = {name: None for name in astuple(INF_SCHEMA_COLUMNS_NAMES)}
65 for column_name in columns:
66 r = row.copy()
67 r[INF_SCHEMA_COLUMNS_NAMES.COLUMN_NAME] = column_name
68 data.append(r)
70 return pd.DataFrame(data, columns=astuple(INF_SCHEMA_COLUMNS_NAMES))
72 def get_table_columns_names(self, table_name: str, schema_name: str | None = None) -> list[str]:
73 """Get a list of column names for the specified table.
75 Args:
76 table_name (str): The name of the table to get columns from.
77 schema_name (str | None): Not in use. The name of the schema to get columns from.
79 Returns:
80 list[str]: A list of column names for the specified table.
81 """
82 return self.project.get_columns(table_name)
84 def predict(self, model_name: str, df, version=None, params=None):
85 model_metadata = self.project.get_model(model_name)
86 if model_metadata is None:
87 raise Exception(f"Can't find model '{model_name}'")
88 model_metadata = model_metadata["metadata"]
89 if model_metadata["update_status"] == "available":
90 raise Exception(f"model '{model_name}' is obsolete and needs to be updated. Run 'RETRAIN {model_name};'")
91 ml_handler = self.integration_controller.get_ml_handler(model_metadata["engine_name"])
92 if params is not None and "partition_size" in params:
94 def callback(chunk):
95 return ml_handler.predict(
96 model_name, chunk, project_name=self.project.name, version=version, params=params
97 )
99 return pd.concat(process_dataframe_in_partitions(df, callback, params["partition_size"]))
101 return ml_handler.predict(model_name, df, project_name=self.project.name, version=version, params=params)
103 def query(self, query: ASTNode | str = None, session=None) -> DataHubResponse:
104 if isinstance(query, str): 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true
105 query = parse_sql(query)
107 if isinstance(query, Update): 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 query_table = query.table.parts[0].lower()
109 kb_table = session.kb_controller.get_table(query_table, self.project.id)
110 if kb_table:
111 # this is the knowledge db
112 kb_table.update_query(query)
113 return DataHubResponse()
115 raise NotImplementedError(f"Can't update object: {query_table}")
117 elif isinstance(query, Delete): 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true
118 query_table = query.table.parts[0].lower()
119 kb_table = session.kb_controller.get_table(query_table, self.project.id)
120 if kb_table:
121 # this is the knowledge db
122 kb_table.delete_query(query)
123 return DataHubResponse()
125 raise NotImplementedError(f"Can't delete object: {query_table}")
127 elif isinstance(query, Select): 127 ↛ 174line 127 didn't jump to line 174 because the condition on line 127 was always true
128 match query.from_table.parts, query.from_table.is_quoted:
129 case [query_table], [is_quoted]:
130 ...
131 case [query_table, int(_)], [is_quoted, _]:
132 ...
133 case [query_table, str(version)], [is_quoted, _] if version.isdigit():
134 ...
135 case _:
136 raise EntityNotExistsError(
137 f"Table '{query.from_table}' not found in the database. The project database support only single-part names",
138 self.project.name,
139 )
141 if not is_quoted: 141 ↛ 145line 141 didn't jump to line 145 because the condition on line 141 was always true
142 query_table = query_table.lower()
144 # region is it query to 'models'?
145 if query_table in ("models", "jobs", "mdb_triggers", "chatbots", "skills", "agents"): 145 ↛ 156line 145 didn't jump to line 156 because the condition on line 145 was always true
146 new_query = deepcopy(query)
147 project_filter = BinaryOperation("=", args=[Identifier("project"), Constant(self.project.name)])
148 if new_query.where is None: 148 ↛ 151line 148 didn't jump to line 151 because the condition on line 148 was always true
149 new_query.where = project_filter
150 else:
151 new_query.where = BinaryOperation("and", args=[new_query.where, project_filter])
152 return self.information_schema.query(new_query)
153 # endregion
155 # other table from project
156 if self.project.get_view(query_table, strict_case=is_quoted):
157 # this is the view
158 df = self.project.query_view(query, session)
160 columns_info = [{"name": k, "type": v} for k, v in df.dtypes.items()]
162 return DataHubResponse(data_frame=df, columns=columns_info)
164 kb_table = session.kb_controller.get_table(query_table, self.project.id)
165 if kb_table:
166 # this is the knowledge db
167 df = kb_table.select_query(query)
168 columns_info = [{"name": k, "type": v} for k, v in df.dtypes.items()]
170 return DataHubResponse(data_frame=df, columns=columns_info)
172 raise EntityNotExistsError(f"Table '{query_table}' not found in database", self.project.name)
173 else:
174 raise NotImplementedError(f"Query not supported {query}")
176 def create_table(
177 self, table_name: Identifier, result_set=None, is_replace=False, params=None, is_create=None, **kwargs
178 ) -> DataHubResponse:
179 # is_create - create table
180 # is_replace - drop table if exists
181 # is_create==False and is_replace==False: just insert
183 from mindsdb.api.executor.controllers.session_controller import SessionController
185 session = SessionController()
187 if is_create:
188 raise NotImplementedError(f"Can't create table {table_name}")
190 table_name = table_name.parts[-1]
191 kb_table = session.kb_controller.get_table(table_name, self.project.id)
192 if kb_table:
193 # this is the knowledge db
194 if is_replace:
195 kb_table.clear()
197 df = result_set.to_df()
198 kb_table.insert(df, params=params)
199 return DataHubResponse()
201 raise ValueError(f"Table or Knowledge Base '{table_name}' doesn't exist")