Coverage for mindsdb / api / executor / utilities / sql.py: 57%
137 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import copy
2from typing import List
4import duckdb
5from duckdb import InvalidInputException
6import numpy as np
7import orjson
9from mindsdb_sql_parser import parse_sql
10from mindsdb_sql_parser.ast import ASTNode, Select, Identifier, Function, Constant
12from mindsdb.integrations.utilities.query_traversal import query_traversal
13from mindsdb.utilities import log
14from mindsdb.utilities.exception import QueryError
15from mindsdb.utilities.functions import resolve_table_identifier, resolve_model_identifier
16from mindsdb.utilities.json_encoder import CustomJSONEncoder
17from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
18from mindsdb.api.executor.utilities.mysql_to_duckdb_functions import mysql_to_duckdb_fnc
20logger = log.getLogger(__name__)
23def _get_query_tables(query: ASTNode, resolve_function: callable, default_database: str = None) -> List[tuple]:
24 """Find all tables/models in the query
26 Args:
27 query (ASTNode): query
28 resolve_function (callable): function apply to identifier
29 default_database (str): database name that will be used if there is no db name in identifier
31 Returns:
32 List[tuple]: list with (db/project name, table name, version)
33 """
34 tables = []
36 def _get_tables(node, is_table, **kwargs):
37 if is_table and isinstance(node, Identifier):
38 table = resolve_function(node)
39 if table[0] is None:
40 table = (default_database,) + table[1:]
41 tables.append(table)
43 query_traversal(query, _get_tables)
44 return tables
47def get_query_tables(query: ASTNode, default_database: str = None) -> List[tuple]:
48 return _get_query_tables(query, resolve_table_identifier, default_database)
51def get_query_models(query: ASTNode, default_database: str = None) -> List[tuple]:
52 return _get_query_tables(query, resolve_model_identifier, default_database)
55def query_df_with_type_infer_fallback(query_str: str, dataframes: dict, user_functions=None):
56 """Duckdb need to infer column types if column.dtype == object. By default it take 1000 rows,
57 but that may be not sufficient for some cases. This func try to run query multiple times
58 increasing butch size for type infer
60 Args:
61 query_str (str): query to execute
62 dataframes (dict): dataframes
63 user_functions: functions controller which register new functions in connection
65 Returns:
66 pandas.DataFrame
67 pandas.columns
69 Raises:
70 QueryError: Raised when DuckDB fails to execute the query
71 """
73 try:
74 with duckdb.connect(database=":memory:") as con:
75 if user_functions:
76 user_functions.register(con)
78 for name, value in dataframes.items():
79 con.register(name, value)
81 exception = None
82 for sample_size in [1000, 10000, 1000000]: 82 ↛ 91line 82 didn't jump to line 91 because the loop on line 82 didn't complete
83 try:
84 con.execute(f"set global pandas_analyze_sample={sample_size};")
85 result_df = con.execute(query_str).fetchdf()
86 except InvalidInputException as e:
87 exception = e
88 else:
89 break
90 else:
91 raise exception
92 description = con.description
93 except InvalidInputException as e:
94 raise QueryError(
95 db_type="DuckDB",
96 db_error_msg=f"DuckDB failed to execute query, likely due to inability to determine column data types. Details: {e}",
97 failed_query=query_str,
98 is_external=False,
99 is_expected=False,
100 ) from e
101 except Exception as e:
102 raise QueryError(
103 db_type="DuckDB", db_error_msg=str(e), failed_query=query_str, is_external=False, is_expected=False
104 ) from e
106 return result_df, description
109_duckdb_functions_and_kw_list = None
112def get_duckdb_functions_and_kw_list() -> list[str] | None:
113 """Returns a list of all functions and keywords supported by DuckDB.
114 The list is merge of:
115 - list of duckdb's functions: 'select * from duckdb_functions()' or 'pragma functions'
116 - ist of keywords, because of some functions are just sintax-sugar
117 and not present in the duckdb_functions (like 'if()').
118 - hardcoded list of window_functions, because there are no way to get if from duckdb,
119 and they are not present in the duckdb_functions()
121 Returns:
122 list[str] | None: List of supported functions and keywords, or None if unable to retrieve the list.
123 """
124 global _duckdb_functions_and_kw_list
125 window_functions_list = [
126 "cume_dist",
127 "dense_rank",
128 "first_value",
129 "lag",
130 "last_value",
131 "lead",
132 "nth_value",
133 "ntile",
134 "percent_rank",
135 "rank_dense",
136 "rank",
137 "row_number",
138 ]
139 if _duckdb_functions_and_kw_list is None:
140 try:
141 df, _ = query_df_with_type_infer_fallback(
142 """
143 select distinct name
144 from (
145 select function_name as name from duckdb_functions()
146 union all
147 select keyword_name as name from duckdb_keywords()
148 ) ta;
149 """,
150 dataframes={},
151 )
152 df.columns = [name.lower() for name in df.columns]
153 _duckdb_functions_and_kw_list = df["name"].drop_duplicates().str.lower().to_list() + window_functions_list
154 except Exception as e:
155 logger.warning(f"Unable to get DuckDB functions list: {e}")
157 return _duckdb_functions_and_kw_list
160def query_df(df, query, session=None):
161 """Perform simple query ('select' from one table, without subqueries and joins) on DataFrame.
163 Args:
164 df (pandas.DataFrame): data
165 query (mindsdb_sql_parser.ast.Select | str): select query
167 Returns:
168 pandas.DataFrame
169 """
171 if isinstance(query, str): 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true
172 query_ast = parse_sql(query)
173 query_str = query
174 else:
175 query_ast = copy.deepcopy(query)
176 query_str = str(query)
178 if isinstance(query_ast, Select) is False or isinstance(query_ast.from_table, Identifier) is False: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 raise QueryError(
180 db_type="DuckDB",
181 db_error_msg="Only 'SELECT from TABLE' statements supported for internal query",
182 failed_query=query_str,
183 is_external=False,
184 is_expected=False,
185 )
187 query_ast.from_table.parts = ["df"]
189 return query_dfs({"df": df}, query_ast, session=session)
192def query_dfs(dataframes, query_ast, session=None):
193 json_columns = set()
195 if session is not None:
196 user_functions = session.function_controller.create_function_set()
197 else:
198 user_functions = None
200 def adapt_query(node, is_table, **kwargs):
201 if is_table:
202 return
203 if isinstance(node, Identifier):
204 if len(node.parts) > 1: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true
205 node.parts = [node.parts[-1]]
206 return node
207 if isinstance(node, Function): 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 fnc = mysql_to_duckdb_fnc(node)
209 if fnc is not None:
210 node2 = fnc(node)
211 if node2 is not None:
212 # copy alias
213 node2.alias = node.alias
214 return node2
216 fnc_name = node.op.lower()
217 if fnc_name == "database" and len(node.args) == 0:
218 if session is not None:
219 cur_db = session.database
220 else:
221 cur_db = None
222 return Constant(cur_db)
223 elif fnc_name == "truncate":
224 # replace mysql 'truncate' function to duckdb 'round'
225 node.op = "round"
226 if len(node.args) == 1:
227 node.args.append(0)
228 elif fnc_name == "json_extract":
229 json_columns.add(node.args[0].parts[-1])
230 else:
231 if user_functions is not None:
232 user_functions.check_function(node)
234 duckdb_functions_and_kw_list = get_duckdb_functions_and_kw_list() or []
235 custom_functions_list = [] if user_functions is None else list(user_functions.functions.keys())
236 all_functions_list = duckdb_functions_and_kw_list + custom_functions_list
237 if len(all_functions_list) > 0 and fnc_name not in all_functions_list:
238 raise QueryError(
239 db_type="DuckDB",
240 db_error_msg=(
241 f"Unknown function: '{fnc_name}'. This function is not recognized during internal query processing.\n"
242 "Please use DuckDB-supported functions instead."
243 ),
244 failed_query=query_str,
245 is_external=False,
246 is_expected=False,
247 )
249 query_traversal(query_ast, adapt_query)
251 def _convert(v):
252 if isinstance(v, dict) or isinstance(v, list):
253 try:
254 default_encoder = CustomJSONEncoder().default
255 return orjson.dumps(
256 v, default=default_encoder, option=orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_PASSTHROUGH_DATETIME
257 ).decode("utf-8")
258 except Exception:
259 pass
260 return v
262 render = SqlalchemyRender("postgres")
263 try:
264 query_str = render.get_string(query_ast, with_failback=False)
265 except Exception:
266 logger.exception(f"Exception during query casting to 'postgres' dialect. Query:\n{str(query_ast)}.\nError:")
267 query_str = render.get_string(query_ast, with_failback=True)
269 for table_name, df in dataframes.items():
270 for column in json_columns: 270 ↛ 271line 270 didn't jump to line 271 because the loop on line 270 never started
271 df[column] = df[column].apply(_convert)
273 if len(df) > 0:
274 # workaround to prevent duckdb.TypeMismatchException
275 for sys_name, sys_col in (
276 ("models", "TRAINING_OPTIONS"),
277 ("predictors", "TRAINING_OPTIONS"),
278 ("ml_engines", "CONNECTION_DATA"),
279 ):
280 if table_name.lower() in sys_name and sys_col in df.columns: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true
281 df[sys_col] = df[sys_col].astype("string")
283 result_df, description = query_df_with_type_infer_fallback(query_str, dataframes, user_functions=user_functions)
284 result_df.replace({np.nan: None}, inplace=True)
285 result_df.columns = [x[0] for x in description]
286 return result_df