Coverage for mindsdb / api / executor / planner / query_prepare.py: 71%
278 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import copy
2from mindsdb_sql_parser import ast
4from mindsdb.api.executor.planner.exceptions import PlanningException
5from mindsdb.api.executor.planner import steps
6from mindsdb.api.executor.planner import utils
9def to_string(identifier):
10 # alternative to AST.to_string() but without quoting
11 return ".".join(identifier.parts)
14class Table:
15 def __init__(self, node=None, ds=None, is_predictor=None):
16 self.node = node
17 self.is_predictor = is_predictor
18 self.ds = ds
19 self.name = to_string(node)
20 self.columns = None
21 # None is unknown
22 self.columns_map = None
23 self.keys = None
24 if node.alias:
25 self.alias = to_string(node.alias)
26 else:
27 self.alias = None
28 # self.alias = self.name
31class Column:
32 def __init__(self, node=None, table=None, name=None, type=None):
33 alias = None
34 if node is not None: 34 ↛ 40line 34 didn't jump to line 40 because the condition on line 34 was always true
35 if isinstance(node, ast.Identifier):
36 # set name
37 name = node.parts[-1] # ???
39 else:
40 if table is not None and name is not None:
41 node = ast.Identifier(parts=[table.name, name])
43 self.node = node # link to AST
44 self.alias = alias # for ProjectStep
46 self.is_star = False
48 self.table = table # link to AST table
49 self.name = name # column name
50 self.type = type
53class Statement:
54 def __init__(self):
55 self.columns = []
56 # self.query = None
57 self.params = None
58 self.result = None
60 # Tables on first level of select
61 self.tables_lvl1 = None
63 # mapping tables by name {'ta.ble': Table()}
64 self.tables_map = None
66 self.offset = 0
69class PreparedStatementPlanner:
70 def __init__(self, planner):
71 self.planner = planner
73 def get_type_of_var(self, v):
74 if isinstance(v, str): 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true
75 return "str"
76 elif isinstance(v, float): 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true
77 return "float"
78 elif isinstance(v, int):
79 return "integer"
81 return "str"
83 def get_statement_info(self):
84 stmt = self.planner.statement
86 if stmt is None:
87 raise PlanningException("Statement is not prepared")
89 columns_result = []
91 for column in stmt.columns:
92 table, ds = None, None
93 if column.table is not None:
94 table = column.table.name
95 ds = column.table.ds
96 columns_result.append(
97 dict(
98 alias=column.alias,
99 type=column.type,
100 name=column.name,
101 table_name=table,
102 table_alias=table,
103 ds=ds,
104 )
105 )
107 parameters = []
108 for param in stmt.params:
109 name = "?"
110 parameters.append(
111 dict(
112 alias=name,
113 type="str",
114 name=name,
115 )
116 )
118 return {"parameters": parameters, "columns": columns_result}
120 def get_table_of_column(self, t):
121 tables_map = self.planner.statement.tables_map
123 # get tables to check
124 if len(t.parts) > 1:
125 # try to find table
126 table_parts = t.parts[:-1]
127 table_name = ".".join(table_parts)
128 if table_name in tables_map:
129 return tables_map[table_name]
131 elif len(table_parts) > 1: 131 ↛ exitline 131 didn't return from function 'get_table_of_column' because the condition on line 131 was always true
132 # maybe datasource is 1st part
133 table_parts = table_parts[1:]
134 table_name = ".".join(table_parts)
135 if table_name in tables_map: 135 ↛ exitline 135 didn't return from function 'get_table_of_column' because the condition on line 135 was always true
136 return tables_map[table_name]
138 def table_from_identifier(self, table):
139 # disambiguate
140 if self.planner.is_predictor(table):
141 ds, table = self.planner.get_predictor_namespace_and_name_from_identifier(table)
142 is_predictor = True
144 else:
145 ds, table = self.planner.resolve_database_table(table)
146 is_predictor = False
148 if table.alias is not None:
149 # access by alias if table is having alias
150 keys = [to_string(table.alias)]
152 else:
153 # access by table name, in all variants
154 keys = []
155 parts = []
156 # in reverse order
157 for p in table.parts[::-1]:
158 parts.insert(0, p)
159 keys.append(".".join(parts))
161 # remember table
162 tbl = Table(ds=ds, node=table, is_predictor=is_predictor)
163 tbl.keys = keys
165 return tbl
167 def prepare_select(self, query):
168 # prepare select with or without predictor
170 stmt = self.planner.statement
172 # get all predictors
173 query_predictors = []
175 def find_predictors(node, is_table, **kwargs):
176 if is_table and isinstance(node, ast.Identifier):
177 if self.planner.is_predictor(node):
178 query_predictors.append(node)
180 utils.query_traversal(query, find_predictors)
182 # === get all tables from 1st level of query ===
183 stmt.tables_map = {}
184 stmt.tables_lvl1 = []
185 if query.from_table is not None: 185 ↛ 215line 185 didn't jump to line 215 because the condition on line 185 was always true
186 if isinstance(query.from_table, ast.Join):
187 # get all tables
188 join_tables = utils.convert_join_to_list(query.from_table)
189 else:
190 join_tables = [dict(table=query.from_table)]
192 if isinstance(query.from_table, ast.Select):
193 # nested select, get only last select
194 join_tables = [dict(table=utils.get_deepest_select(query.from_table).from_table)]
196 for i, join_table in enumerate(join_tables):
197 table = join_table["table"]
198 if isinstance(table, ast.Identifier):
199 tbl = self.table_from_identifier(table)
201 if tbl.is_predictor:
202 # Is the last table?
203 if i + 1 < len(join_tables):
204 raise PlanningException("Predictor must be last table in query")
206 stmt.tables_lvl1.append(tbl)
207 for key in tbl.keys:
208 stmt.tables_map[key] = tbl
210 else:
211 # don't add unknown table to looking list
212 continue
214 # is there any predictors at other levels?
215 lvl1_predictors = [i for i in stmt.tables_lvl1 if i.is_predictor]
216 if len(query_predictors) != len(lvl1_predictors):
217 raise PlanningException("Predictor is not at first level")
219 # === get targets ===
220 columns = []
221 get_all_tables = False
222 for t in query.targets:
223 column = Column(t)
225 # column alias
226 alias = None
227 if t.alias is not None:
228 alias = to_string(t.alias)
230 if isinstance(t, ast.Star):
231 if len(stmt.tables_lvl1) == 0: 231 ↛ 233line 231 didn't jump to line 233 because the condition on line 231 was never true
232 # if "from" is emtpy we can't make plan
233 raise PlanningException("Can't find table")
235 column.is_star = True
236 get_all_tables = True
238 elif isinstance(t, ast.Identifier):
239 if alias is None:
240 alias = t.parts[-1]
242 table = self.get_table_of_column(t)
243 if table is None:
244 # table is not known
245 get_all_tables = True
246 else:
247 column.table = table
249 elif isinstance(t, ast.Constant):
250 if alias is None:
251 alias = str(t.value)
252 column.type = self.get_type_of_var(t.value)
253 elif isinstance(t, ast.Function):
254 # mysql function
255 if t.op == "connection_id": 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true
256 column.type = "integer"
257 else:
258 column.type = "str"
259 else:
260 # TODO go down into lower level.
261 # It can be function, operation, select.
262 # But now show it as string
264 # TODO add several known types for function, i.e ABS-int
266 # TODO TypeCast - as casted type
267 column.type = "str"
269 if alias is not None:
270 column.alias = alias
271 columns.append(column)
273 # === get columns from tables ===
274 request_tables = set()
275 for column in columns:
276 if column.table is not None:
277 request_tables.add(column.table.name)
279 for table in stmt.tables_lvl1:
280 if get_all_tables or table.name in request_tables:
281 if table.is_predictor:
282 step = steps.GetPredictorColumns(namespace=table.ds, predictor=table.node)
283 else:
284 step = steps.GetTableColumns(namespace=table.ds, table=table.name)
285 yield step
287 if step.result_data is not None: 287 ↛ 289line 287 didn't jump to line 289 because the condition on line 287 was never true
288 # save results
289 table.columns = step.result_data.columns
290 table.columns_map = {column.name.upper(): column for column in step.result_data.columns}
292 # === create columns list ===
293 columns_result = []
294 for i, column in enumerate(columns):
295 if column.is_star:
296 # add data from all tables
297 for table in stmt.tables_lvl1: 297 ↛ 310line 297 didn't jump to line 310 because the loop on line 297 didn't complete
298 if table.columns is None: 298 ↛ 301line 298 didn't jump to line 301 because the condition on line 298 was always true
299 raise PlanningException(f"Table is not found {table.name}")
301 for col in table.columns:
302 # col = {name: 'col', type: 'str'}
303 column2 = Column(table=table, name=col.name)
304 column2.alias = col.name
305 column2.type = col.type
307 columns_result.append(column2)
309 # to next column
310 continue
312 elif column.name is not None:
313 # is Identifier
314 if isinstance(column.name, ast.Star):
315 continue
316 col_name = column.name.upper()
317 if column.table is not None:
318 table = column.table
319 if table.columns_map is not None: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true
320 if col_name in table.columns_map:
321 column.type = table.columns_map[col_name].type
322 else:
323 # continue
324 raise PlanningException(f"Column not found {col_name}")
326 else:
327 # table is not found, looking for in all tables
328 for table in stmt.tables_lvl1:
329 if table.columns_map is not None: 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true
330 col = table.columns_map.get(col_name)
331 if col is not None:
332 column.type = col.type
333 column.table = table
334 break
336 # forcing alias
337 if column.alias is None:
338 column.alias = f"column_{i}"
340 # forcing type
341 if column.type is None:
342 column.type = "str"
344 columns_result.append(column)
346 # save columns
347 stmt.columns = columns_result
349 def prepare_insert(self, query):
350 stmt = self.planner.statement
352 # get table columns
353 table = self.table_from_identifier(query.table)
354 if table.is_predictor:
355 step = steps.GetPredictorColumns(namespace=table.ds, predictor=table.node)
356 else:
357 step = steps.GetTableColumns(namespace=table.ds, table=table.name)
358 yield step
360 if step.result_data is not None:
361 # save results
363 if len(step.result_data["tables"]) > 0:
364 table_info = step.result_data["tables"][0]
365 columns_info = step.result_data["columns"][table_info]
367 table.columns = []
368 table.ds = table_info[0]
369 for col in columns_info:
370 if isinstance(col, tuple):
371 # is predictor
372 col = dict(name=col[0], type="str")
373 table.columns.append(
374 Column(
375 name=col["name"],
376 type=col["type"],
377 )
378 )
380 # map by names
381 table.columns_map = {i.name.upper(): i for i in table.columns}
383 # save results
384 columns_result = []
385 for col in query.columns:
386 col_name = col.parts[-1]
388 column = Column(table=table, name=col_name)
390 if table.columns_map is not None:
391 col = table.columns_map.get(col_name)
392 if col is not None:
393 column.type = col.type
395 if column.type is None:
396 # forcing type
397 column.type = "str"
399 columns_result.append(column)
401 stmt.columns = columns_result
403 def prepare_show(self, query):
404 stmt = self.planner.statement
406 stmt.columns = [
407 Column(name="Variable_name", type="str"),
408 Column(name="Value", type="str"),
409 ]
410 return []
412 def prepare_steps(self, query):
413 stmt = Statement()
414 self.planner.statement = stmt
416 self.planner.query = query
418 query = copy.deepcopy(query)
420 params = utils.get_query_params(query)
422 stmt.params = params
424 # get columns
425 if isinstance(query, ast.Select):
426 # prepare select
427 return self.prepare_select(query)
428 if isinstance(query, ast.Union):
429 # get column definition only from select
430 return self.prepare_select(query.left)
431 if isinstance(query, ast.Insert): 431 ↛ 434line 431 didn't jump to line 434 because the condition on line 431 was never true
432 # return self.prepare_insert(query)
433 # TODO do we need columns?
434 return []
435 if isinstance(query, ast.Delete):
436 ...
437 # TODO do we need columns?
438 return []
439 if isinstance(query, ast.Show): 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true
440 return self.prepare_show(query)
441 else:
442 # do nothing
443 return []
444 # raise NotImplementedError(query.__name__)
446 def execute_steps(self, params=None):
447 # find all parameters
448 stmt = self.planner.statement
450 # is already executed
451 if stmt is None:
452 if params is not None: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true
453 raise PlanningException("Can't execute statement")
454 stmt = Statement()
456 # === form query with new target ===
458 query = self.planner.query
460 if params is not None:
461 if len(params) != len(stmt.params): 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true
462 raise PlanningException("Count of execution parameters don't match prepared statement")
464 query = utils.fill_query_params(query, params)
466 self.planner.query = query
468 # prevent from second execution
469 stmt.params = None
471 if ( 471 ↛ 483line 471 didn't jump to line 483 because the condition on line 471 was always true
472 isinstance(query, ast.Select)
473 or isinstance(query, ast.Union)
474 or isinstance(query, ast.CreateTable)
475 or isinstance(query, ast.Insert)
476 or isinstance(query, ast.Update)
477 or isinstance(query, ast.Delete)
478 or isinstance(query, ast.Intersect)
479 or isinstance(query, ast.Except)
480 ):
481 return self.plan_query(query)
482 else:
483 return []
485 def plan_query(self, query):
486 # use v1 planner
487 self.planner.from_query(query)
488 step = None
489 for step in self.planner.plan.steps:
490 yield step