Coverage for mindsdb / api / executor / utilities / sql.py: 57%

137 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import copy 

2from typing import List 

3 

4import duckdb 

5from duckdb import InvalidInputException 

6import numpy as np 

7import orjson 

8 

9from mindsdb_sql_parser import parse_sql 

10from mindsdb_sql_parser.ast import ASTNode, Select, Identifier, Function, Constant 

11 

12from mindsdb.integrations.utilities.query_traversal import query_traversal 

13from mindsdb.utilities import log 

14from mindsdb.utilities.exception import QueryError 

15from mindsdb.utilities.functions import resolve_table_identifier, resolve_model_identifier 

16from mindsdb.utilities.json_encoder import CustomJSONEncoder 

17from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

18from mindsdb.api.executor.utilities.mysql_to_duckdb_functions import mysql_to_duckdb_fnc 

19 

20logger = log.getLogger(__name__) 

21 

22 

23def _get_query_tables(query: ASTNode, resolve_function: callable, default_database: str = None) -> List[tuple]: 

24 """Find all tables/models in the query 

25 

26 Args: 

27 query (ASTNode): query 

28 resolve_function (callable): function apply to identifier 

29 default_database (str): database name that will be used if there is no db name in identifier 

30 

31 Returns: 

32 List[tuple]: list with (db/project name, table name, version) 

33 """ 

34 tables = [] 

35 

36 def _get_tables(node, is_table, **kwargs): 

37 if is_table and isinstance(node, Identifier): 

38 table = resolve_function(node) 

39 if table[0] is None: 

40 table = (default_database,) + table[1:] 

41 tables.append(table) 

42 

43 query_traversal(query, _get_tables) 

44 return tables 

45 

46 

47def get_query_tables(query: ASTNode, default_database: str = None) -> List[tuple]: 

48 return _get_query_tables(query, resolve_table_identifier, default_database) 

49 

50 

51def get_query_models(query: ASTNode, default_database: str = None) -> List[tuple]: 

52 return _get_query_tables(query, resolve_model_identifier, default_database) 

53 

54 

55def query_df_with_type_infer_fallback(query_str: str, dataframes: dict, user_functions=None): 

56 """Duckdb need to infer column types if column.dtype == object. By default it take 1000 rows, 

57 but that may be not sufficient for some cases. This func try to run query multiple times 

58 increasing butch size for type infer 

59 

60 Args: 

61 query_str (str): query to execute 

62 dataframes (dict): dataframes 

63 user_functions: functions controller which register new functions in connection 

64 

65 Returns: 

66 pandas.DataFrame 

67 pandas.columns 

68 

69 Raises: 

70 QueryError: Raised when DuckDB fails to execute the query 

71 """ 

72 

73 try: 

74 with duckdb.connect(database=":memory:") as con: 

75 if user_functions: 

76 user_functions.register(con) 

77 

78 for name, value in dataframes.items(): 

79 con.register(name, value) 

80 

81 exception = None 

82 for sample_size in [1000, 10000, 1000000]: 82 ↛ 91line 82 didn't jump to line 91 because the loop on line 82 didn't complete

83 try: 

84 con.execute(f"set global pandas_analyze_sample={sample_size};") 

85 result_df = con.execute(query_str).fetchdf() 

86 except InvalidInputException as e: 

87 exception = e 

88 else: 

89 break 

90 else: 

91 raise exception 

92 description = con.description 

93 except InvalidInputException as e: 

94 raise QueryError( 

95 db_type="DuckDB", 

96 db_error_msg=f"DuckDB failed to execute query, likely due to inability to determine column data types. Details: {e}", 

97 failed_query=query_str, 

98 is_external=False, 

99 is_expected=False, 

100 ) from e 

101 except Exception as e: 

102 raise QueryError( 

103 db_type="DuckDB", db_error_msg=str(e), failed_query=query_str, is_external=False, is_expected=False 

104 ) from e 

105 

106 return result_df, description 

107 

108 

109_duckdb_functions_and_kw_list = None 

110 

111 

112def get_duckdb_functions_and_kw_list() -> list[str] | None: 

113 """Returns a list of all functions and keywords supported by DuckDB. 

114 The list is merge of: 

115 - list of duckdb's functions: 'select * from duckdb_functions()' or 'pragma functions' 

116 - ist of keywords, because of some functions are just sintax-sugar 

117 and not present in the duckdb_functions (like 'if()'). 

118 - hardcoded list of window_functions, because there are no way to get if from duckdb, 

119 and they are not present in the duckdb_functions() 

120 

121 Returns: 

122 list[str] | None: List of supported functions and keywords, or None if unable to retrieve the list. 

123 """ 

124 global _duckdb_functions_and_kw_list 

125 window_functions_list = [ 

126 "cume_dist", 

127 "dense_rank", 

128 "first_value", 

129 "lag", 

130 "last_value", 

131 "lead", 

132 "nth_value", 

133 "ntile", 

134 "percent_rank", 

135 "rank_dense", 

136 "rank", 

137 "row_number", 

138 ] 

139 if _duckdb_functions_and_kw_list is None: 

140 try: 

141 df, _ = query_df_with_type_infer_fallback( 

142 """ 

143 select distinct name 

144 from ( 

145 select function_name as name from duckdb_functions() 

146 union all 

147 select keyword_name as name from duckdb_keywords() 

148 ) ta; 

149 """, 

150 dataframes={}, 

151 ) 

152 df.columns = [name.lower() for name in df.columns] 

153 _duckdb_functions_and_kw_list = df["name"].drop_duplicates().str.lower().to_list() + window_functions_list 

154 except Exception as e: 

155 logger.warning(f"Unable to get DuckDB functions list: {e}") 

156 

157 return _duckdb_functions_and_kw_list 

158 

159 

160def query_df(df, query, session=None): 

161 """Perform simple query ('select' from one table, without subqueries and joins) on DataFrame. 

162 

163 Args: 

164 df (pandas.DataFrame): data 

165 query (mindsdb_sql_parser.ast.Select | str): select query 

166 

167 Returns: 

168 pandas.DataFrame 

169 """ 

170 

171 if isinstance(query, str): 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true

172 query_ast = parse_sql(query) 

173 query_str = query 

174 else: 

175 query_ast = copy.deepcopy(query) 

176 query_str = str(query) 

177 

178 if isinstance(query_ast, Select) is False or isinstance(query_ast.from_table, Identifier) is False: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 raise QueryError( 

180 db_type="DuckDB", 

181 db_error_msg="Only 'SELECT from TABLE' statements supported for internal query", 

182 failed_query=query_str, 

183 is_external=False, 

184 is_expected=False, 

185 ) 

186 

187 query_ast.from_table.parts = ["df"] 

188 

189 return query_dfs({"df": df}, query_ast, session=session) 

190 

191 

192def query_dfs(dataframes, query_ast, session=None): 

193 json_columns = set() 

194 

195 if session is not None: 

196 user_functions = session.function_controller.create_function_set() 

197 else: 

198 user_functions = None 

199 

200 def adapt_query(node, is_table, **kwargs): 

201 if is_table: 

202 return 

203 if isinstance(node, Identifier): 

204 if len(node.parts) > 1: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true

205 node.parts = [node.parts[-1]] 

206 return node 

207 if isinstance(node, Function): 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 fnc = mysql_to_duckdb_fnc(node) 

209 if fnc is not None: 

210 node2 = fnc(node) 

211 if node2 is not None: 

212 # copy alias 

213 node2.alias = node.alias 

214 return node2 

215 

216 fnc_name = node.op.lower() 

217 if fnc_name == "database" and len(node.args) == 0: 

218 if session is not None: 

219 cur_db = session.database 

220 else: 

221 cur_db = None 

222 return Constant(cur_db) 

223 elif fnc_name == "truncate": 

224 # replace mysql 'truncate' function to duckdb 'round' 

225 node.op = "round" 

226 if len(node.args) == 1: 

227 node.args.append(0) 

228 elif fnc_name == "json_extract": 

229 json_columns.add(node.args[0].parts[-1]) 

230 else: 

231 if user_functions is not None: 

232 user_functions.check_function(node) 

233 

234 duckdb_functions_and_kw_list = get_duckdb_functions_and_kw_list() or [] 

235 custom_functions_list = [] if user_functions is None else list(user_functions.functions.keys()) 

236 all_functions_list = duckdb_functions_and_kw_list + custom_functions_list 

237 if len(all_functions_list) > 0 and fnc_name not in all_functions_list: 

238 raise QueryError( 

239 db_type="DuckDB", 

240 db_error_msg=( 

241 f"Unknown function: '{fnc_name}'. This function is not recognized during internal query processing.\n" 

242 "Please use DuckDB-supported functions instead." 

243 ), 

244 failed_query=query_str, 

245 is_external=False, 

246 is_expected=False, 

247 ) 

248 

249 query_traversal(query_ast, adapt_query) 

250 

251 def _convert(v): 

252 if isinstance(v, dict) or isinstance(v, list): 

253 try: 

254 default_encoder = CustomJSONEncoder().default 

255 return orjson.dumps( 

256 v, default=default_encoder, option=orjson.OPT_SERIALIZE_NUMPY | orjson.OPT_PASSTHROUGH_DATETIME 

257 ).decode("utf-8") 

258 except Exception: 

259 pass 

260 return v 

261 

262 render = SqlalchemyRender("postgres") 

263 try: 

264 query_str = render.get_string(query_ast, with_failback=False) 

265 except Exception: 

266 logger.exception(f"Exception during query casting to 'postgres' dialect. Query:\n{str(query_ast)}.\nError:") 

267 query_str = render.get_string(query_ast, with_failback=True) 

268 

269 for table_name, df in dataframes.items(): 

270 for column in json_columns: 270 ↛ 271line 270 didn't jump to line 271 because the loop on line 270 never started

271 df[column] = df[column].apply(_convert) 

272 

273 if len(df) > 0: 

274 # workaround to prevent duckdb.TypeMismatchException 

275 for sys_name, sys_col in ( 

276 ("models", "TRAINING_OPTIONS"), 

277 ("predictors", "TRAINING_OPTIONS"), 

278 ("ml_engines", "CONNECTION_DATA"), 

279 ): 

280 if table_name.lower() in sys_name and sys_col in df.columns: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true

281 df[sys_col] = df[sys_col].astype("string") 

282 

283 result_df, description = query_df_with_type_infer_fallback(query_str, dataframes, user_functions=user_functions) 

284 result_df.replace({np.nan: None}, inplace=True) 

285 result_df.columns = [x[0] for x in description] 

286 return result_df