Coverage for mindsdb / api / executor / planner / query_planner.py: 84%
504 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import copy
3import pandas as pd
5from mindsdb_sql_parser import ast
6from mindsdb_sql_parser.ast import (
7 Select,
8 Identifier,
9 Join,
10 Star,
11 BinaryOperation,
12 Constant,
13 Union,
14 CreateTable,
15 Function,
16 Insert,
17 Except,
18 Intersect,
19 Update,
20 NativeQuery,
21 Parameter,
22 Delete,
23)
25from mindsdb.api.executor.planner.exceptions import PlanningException
26from mindsdb.api.executor.planner import utils
27from mindsdb.api.executor.planner.query_plan import QueryPlan
28from mindsdb.api.executor.planner.steps import (
29 PlanStep,
30 FetchDataframeStep,
31 ProjectStep,
32 ApplyPredictorStep,
33 ApplyPredictorRowStep,
34 UnionStep,
35 GetPredictorColumns,
36 SaveToTable,
37 InsertToTable,
38 UpdateToTable,
39 SubSelectStep,
40 QueryStep,
41 JoinStep,
42 DeleteStep,
43 DataStep,
44 CreateTableStep,
45 FetchDataframeStepPartition,
46)
47from mindsdb.api.executor.planner.utils import (
48 disambiguate_predictor_column_identifier,
49 recursively_extract_column_values,
50 query_traversal,
51 filters_to_bin_op,
52)
53from mindsdb.api.executor.planner.plan_join import PlanJoin
54from mindsdb.api.executor.planner.query_prepare import PreparedStatementPlanner
55from mindsdb.utilities.config import config
58default_project = config.get("default_project")
60# This includes built-in MindsDB SQL functions and functions to be executed via DuckDB consistently.
61MINDSDB_SQL_FUNCTIONS = {"llm", "to_markdown", "hash"}
64def _resolve_identifier_part(identifier: Identifier, part: int = -1) -> str:
65 """Resolve a part of an identifier.
67 Args:
68 identifier (Identifier): The identifier to resolve the part of.
69 part (int): The part number to resolve.
71 Returns:
72 str: part of the identifier in lowercase if not quoted, otherwise the part itself.
73 """
74 name = identifier.parts[part]
75 is_quoted = identifier.is_quoted[part]
76 if not is_quoted: 76 ↛ 78line 76 didn't jump to line 78 because the condition on line 76 was always true
77 name = name.lower()
78 return name
81class QueryPlanner:
82 def __init__(
83 self,
84 query=None,
85 integrations: list = None,
86 predictor_namespace=None,
87 predictor_metadata: list = None,
88 default_namespace: str = None,
89 ):
90 self.query = query
91 self.plan = QueryPlan()
93 _projects = set()
94 self.integrations = {}
95 if integrations is not None:
96 for integration in integrations:
97 if isinstance(integration, dict):
98 integration_name = integration["name"]
99 # it is project of system database
100 if integration["type"] != "data":
101 _projects.add(integration_name)
102 continue
103 else:
104 integration_name = integration
105 integration = {"name": integration}
106 self.integrations[integration_name] = integration
108 # allow to select from mindsdb namespace
109 _projects.add(default_project)
111 self.default_namespace = default_namespace
113 # legacy parameter
114 self.predictor_namespace = predictor_namespace.lower() if predictor_namespace else default_project
116 # map for lower names of predictors
118 self.predictor_info = {}
119 if isinstance(predictor_metadata, list):
120 # convert to dict
121 for predictor in predictor_metadata:
122 if "integration_name" in predictor: 122 ↛ 125line 122 didn't jump to line 125 because the condition on line 122 was always true
123 integration_name = predictor["integration_name"]
124 else:
125 integration_name = self.predictor_namespace
126 predictor["integration_name"] = integration_name
127 idx = f"{integration_name}.{predictor['name']}".lower()
128 self.predictor_info[idx] = predictor
129 _projects.add(integration_name.lower())
130 elif isinstance(predictor_metadata, dict):
131 # legacy behaviour
132 for name, predictor in predictor_metadata.items():
133 if "." not in name: 133 ↛ 142line 133 didn't jump to line 142 because the condition on line 133 was always true
134 if "integration_name" in predictor: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true
135 integration_name = predictor["integration_name"]
136 else:
137 integration_name = self.predictor_namespace
138 predictor["integration_name"] = integration_name
139 name = f"{integration_name}.{name}".lower()
140 _projects.add(integration_name.lower())
142 self.predictor_info[name] = predictor
144 self.projects = list(_projects)
145 self.databases = list(self.integrations.keys()) + self.projects
147 self.statement = None
149 self.cte_results = {}
151 def is_predictor(self, identifier):
152 if not isinstance(identifier, Identifier):
153 return False
154 return self.get_predictor(identifier) is not None
156 def get_predictor(self, identifier):
157 name_parts = list(identifier.parts)
159 version = None
160 if len(name_parts) > 1 and name_parts[-1].isdigit():
161 # last part is version
162 version = name_parts[-1]
163 name_parts = name_parts[:-1]
165 name = name_parts[-1]
167 namespace = None
168 if len(name_parts) > 1:
169 namespace = name_parts[-2]
170 else:
171 if self.default_namespace is not None:
172 namespace = self.default_namespace
174 idx_ar = [name]
175 if namespace is not None:
176 idx_ar.insert(0, namespace)
178 idx = ".".join(idx_ar).lower()
179 info = self.predictor_info.get(idx)
180 if info is not None:
181 info["version"] = version
182 info["name"] = name
183 return info
185 def prepare_integration_select(self, database, query):
186 # replacement for 'utils.recursively_disambiguate_*' functions from utils
187 # main purpose: make tests working (don't change planner outputs)
188 # can be removed in future (with adapting the tests) except 'cut integration part' block
190 def _prepare_integration_select(node, is_table, is_target, parent_query, **kwargs):
191 if not isinstance(node, Identifier):
192 return
194 # cut integration part
195 if len(node.parts) > 1:
196 if (node.is_quoted[0] and node.parts[0] == database) or (
197 not node.is_quoted[0] and node.parts[0].lower() == database.lower()
198 ):
199 node.parts.pop(0)
200 node.is_quoted.pop(0)
202 if not hasattr(parent_query, "from_table"):
203 return
205 table = parent_query.from_table
206 if not is_table:
207 # add table name or alias for identifiers
208 if isinstance(table, Join):
209 # skip for join
210 return
212 # keep column name for target
213 if is_target:
214 if node.alias is None:
215 last_part = node.parts[-1]
216 if isinstance(last_part, str): 216 ↛ exitline 216 didn't return from function '_prepare_integration_select' because the condition on line 216 was always true
217 node.alias = Identifier(parts=[node.parts[-1]])
219 query_traversal(query, _prepare_integration_select)
221 def get_integration_select_step(self, select: Select, params: dict = None) -> PlanStep:
222 """
223 Generate planner step to execute query over integration or over results of previous step (if it is CTE)
224 """
226 if isinstance(select.from_table, NativeQuery):
227 integration_name = select.from_table.integration.parts[-1]
228 else:
229 integration_name, table = self.resolve_database_table(select.from_table)
231 # is it CTE?
232 table_name = table_alias = _resolve_identifier_part(table)
233 if table.alias is not None:
234 table_alias = _resolve_identifier_part(table.alias)
236 if integration_name == self.default_namespace and table_name in self.cte_results:
237 select.from_table = None
238 return SubSelectStep(select, self.cte_results[table_name], table_name=table_alias)
240 fetch_df_select = copy.deepcopy(select)
241 self.prepare_integration_select(integration_name, fetch_df_select)
243 # remove predictor params
244 if fetch_df_select.using is not None:
245 fetch_df_select.using = None
246 fetch_params = self.get_fetch_params(params)
247 return FetchDataframeStep(integration=integration_name, query=fetch_df_select, params=fetch_params)
249 def get_fetch_params(self, params):
250 # extracts parameters for fetching
252 if params:
253 fetch_params = params.copy()
254 # remove partition parameters
255 for key in ("batch_size", "track_column"):
256 if key in params:
257 del params[key]
258 if "track_column" in fetch_params and isinstance(fetch_params["track_column"], Identifier):
259 fetch_params["track_column"] = fetch_params["track_column"].parts[-1]
260 else:
261 fetch_params = None
262 return fetch_params
264 def plan_integration_select(self, select):
265 """Plan for a select query that can be fully executed in an integration"""
267 return self.plan.add_step(self.get_integration_select_step(select, params=select.using))
269 def resolve_database_table(self, node: Identifier):
270 # resolves integration name and table name
272 parts = node.parts.copy()
273 alias = None
274 if node.alias is not None:
275 alias = node.alias.copy()
277 database = self.default_namespace
279 err_msg_suffix = ""
280 if len(parts) > 1:
281 # if not quoted check in lower case
282 part = parts[0]
283 if part not in self.databases and not node.is_quoted[0]:
284 part = part.lower()
286 if part in self.databases:
287 database = part
288 parts.pop(0)
289 else:
290 err_msg_suffix = f"'{parts[0].lower()}' is not valid database name."
292 if database is None:
293 raise PlanningException(
294 f"Invalid or missing database name for identifier '{node}'. {err_msg_suffix}\n"
295 "Query must include a valid database name prefix in format: 'database_name.table_name' or 'database_name.schema_name.table_name'"
296 )
298 return database, Identifier(parts=parts, alias=alias)
300 def get_query_info(self, query):
301 # get all predictors
302 mdb_entities = []
303 predictors = []
304 user_functions = []
305 # projects = set()
306 integrations = set()
308 def find_objects(node, is_table, **kwargs):
309 if isinstance(node, Function):
310 if node.namespace is not None or node.op.lower() in MINDSDB_SQL_FUNCTIONS:
311 user_functions.append(node)
313 if is_table:
314 if isinstance(node, ast.Identifier):
315 integration, _ = self.resolve_database_table(node)
317 if self.is_predictor(node):
318 predictors.append(node)
320 if integration in self.projects:
321 # it is project
322 mdb_entities.append(node)
324 elif integration is not None: 324 ↛ 326line 324 didn't jump to line 326 because the condition on line 324 was always true
325 integrations.add(integration)
326 if isinstance(node, ast.NativeQuery) or isinstance(node, ast.Data):
327 mdb_entities.append(node)
329 query_traversal(query, find_objects)
331 # cte names are not mdb objects
332 if isinstance(query, Select) and query.cte:
333 cte_names = [cte.name.parts[-1] for cte in query.cte]
334 mdb_entities = [item for item in mdb_entities if ".".join(item.parts) not in cte_names]
336 return {
337 "mdb_entities": mdb_entities,
338 "integrations": integrations,
339 "predictors": predictors,
340 "user_functions": user_functions,
341 }
343 def get_nested_selects_plan_fnc(self, main_integration, force=False):
344 # returns function for traversal over query and inject fetch data query instead of subselects
345 def find_selects(node, **kwargs):
346 if isinstance(node, Select):
347 query_info2 = self.get_query_info(node)
348 if force or (
349 len(query_info2["integrations"]) > 1
350 or main_integration not in query_info2["integrations"]
351 or len(query_info2["mdb_entities"]) > 0
352 ):
353 # need to execute in planner
355 node.parentheses = False
356 last_step = self.plan_select(node)
358 node2 = Parameter(last_step.result)
360 return node2
362 return find_selects
364 def plan_select_identifier(self, query):
365 # query_info = self.get_query_info(query)
366 #
367 # if len(query_info['integrations']) == 0 and len(query_info['predictors']) >= 1:
368 # # select from predictor
369 # return self.plan_select_from_predictor(query)
370 # elif (
371 # len(query_info['integrations']) == 1
372 # and len(query_info['mdb_entities']) == 0
373 # and len(query_info['user_functions']) == 0
374 # ):
375 #
376 # int_name = list(query_info['integrations'])[0]
377 # if self.integrations.get(int_name, {}).get('class_type') != 'api':
378 # # one integration without predictors, send all query to integration
379 # return self.plan_integration_select(query)
381 # find subselects
382 main_integration, _ = self.resolve_database_table(query.from_table)
383 is_api_db = self.integrations.get(main_integration, {}).get("class_type") == "api"
385 find_selects = self.get_nested_selects_plan_fnc(main_integration, force=is_api_db)
386 query.targets = query_traversal(query.targets, find_selects)
387 query_traversal(query.where, find_selects)
389 # get info of updated query
390 query_info = self.get_query_info(query)
392 if len(query_info["predictors"]) >= 1:
393 # select from predictor
394 return self.plan_select_from_predictor(query)
395 elif is_api_db:
396 return self.plan_api_db_select(query)
397 elif len(query_info["user_functions"]) > 0:
398 return self.plan_integration_select_with_functions(query)
399 else:
400 # fallback to integration
401 return self.plan_integration_select(query)
403 def plan_integration_select_with_functions(self, query):
404 # UDF can't be aggregate function: it means we have to do aggregation after function execution
405 # - remove targets from query
406 # - add subselect with targets
408 # replace functions in conditions
410 query2 = query.copy()
412 skipped_conditions = []
414 def replace_functions(node, **kwargs):
415 if not isinstance(node, BinaryOperation):
416 return
418 arg1, arg2 = node.args
419 if not isinstance(arg1, Function): 419 ↛ 421line 419 didn't jump to line 421 because the condition on line 419 was always true
420 arg1 = arg2
421 if not isinstance(arg1, Function): 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true
422 return
424 # user defined
425 if arg1.namespace is not None: 425 ↛ exitline 425 didn't return from function 'replace_functions' because the condition on line 425 was always true
426 # clear
427 skipped_conditions.append(node)
428 node.args = [Constant(0), Constant(0)]
429 node.op = "="
431 query_traversal(query2.where, replace_functions)
433 query2.targets = [Star()]
435 # don't do aggregate
436 query2.having = None
438 if query.group_by is not None: 438 ↛ 440line 438 didn't jump to line 440 because the condition on line 438 was never true
439 # if aggregation exists, do order and limit in subquery
440 query2.group_by = None
441 query2.order_by = None
442 query2.limit = None
443 else:
444 query.order_by = None
445 query.limit = None
447 # if all conditions were executed - clear it
448 if len(skipped_conditions) == 0:
449 query.where = None
451 prev_step = self.plan_integration_select(query2)
453 return self.plan_sub_select(query, prev_step)
455 def plan_api_db_select(self, query):
456 # split to select from api database
457 # keep only limit and where
458 # the rest goes to outer select
459 query2 = Select(
460 targets=query.targets,
461 from_table=query.from_table,
462 where=query.where,
463 order_by=query.order_by,
464 limit=query.limit,
465 )
466 prev_step = self.plan_integration_select(query2)
468 # clear limit and where
469 query.limit = None
470 query.where = None
471 return self.plan_sub_select(query, prev_step)
473 def plan_nested_select(self, select):
474 # query_info = self.get_query_info(select)
475 # # get all predictors
476 #
477 # if (
478 # len(query_info['mdb_entities']) == 0
479 # and len(query_info['integrations']) == 1
480 # and len(query_info['user_functions']) == 0
481 # and 'files' not in query_info['integrations']
482 # and 'views' not in query_info['integrations']
483 # ):
484 # int_name = list(query_info['integrations'])[0]
485 # if self.integrations.get(int_name, {}).get('class_type') != 'api':
486 #
487 # # if no predictor inside = run as is
488 # return self.plan_integration_nested_select(select, int_name)
490 return self.plan_mdb_nested_select(select)
492 def plan_mdb_nested_select(self, select):
493 # plan nested select
495 select2 = copy.deepcopy(select.from_table)
496 select2.parentheses = False
497 select2.alias = None
498 self.plan_select(select2)
499 last_step = self.plan.steps[-1]
501 return self.plan_sub_select(select, last_step)
503 def get_predictor_namespace_and_name_from_identifier(self, identifier):
504 new_identifier = copy.deepcopy(identifier)
506 info = self.get_predictor(identifier)
507 namespace = info["integration_name"]
509 parts = [namespace, info["name"]]
510 if info["version"] is not None:
511 parts.append(info["version"])
512 new_identifier.parts = parts
514 return namespace, new_identifier
516 def plan_select_from_predictor(self, select):
517 predictor_namespace, predictor = self.get_predictor_namespace_and_name_from_identifier(select.from_table)
519 if select.where == BinaryOperation("=", args=[Constant(1), Constant(0)]):
520 # Hardcoded mysql way of getting predictor columns
521 predictor_identifier = utils.get_predictor_name_identifier(predictor)
522 predictor_step = self.plan.add_step(
523 GetPredictorColumns(namespace=predictor_namespace, predictor=predictor_identifier)
524 )
525 else:
526 new_query_targets = []
527 for target in select.targets:
528 if isinstance(target, Identifier):
529 new_query_targets.append(disambiguate_predictor_column_identifier(target, predictor))
530 elif type(target) in (Star, Constant, Function): 530 ↛ 533line 530 didn't jump to line 533 because the condition on line 530 was always true
531 new_query_targets.append(target)
532 else:
533 raise PlanningException(f"Unknown select target {type(target)}")
535 if select.group_by or select.having:
536 raise PlanningException(
537 "Unsupported operation when querying predictor. Only WHERE is allowed and required."
538 )
540 row_dict = {}
541 where_clause = select.where
542 if not where_clause:
543 raise PlanningException("WHERE clause required when selecting from predictor")
545 predictor_identifier = utils.get_predictor_name_identifier(predictor)
546 recursively_extract_column_values(where_clause, row_dict, predictor_identifier)
548 params = None
549 if select.using is not None:
550 params = select.using
551 predictor_step = self.plan.add_step(
552 ApplyPredictorRowStep(
553 namespace=predictor_namespace, predictor=predictor_identifier, row_dict=row_dict, params=params
554 )
555 )
556 project_step = self.plan_project(select, predictor_step.result)
557 return project_step
559 def plan_predictor(self, query, table, predictor_namespace, predictor):
560 int_select = copy.deepcopy(query)
561 int_select.targets = [Star()] # TODO why not query.targets?
562 int_select.from_table = table
564 predictor_alias = None
565 if predictor.alias is not None:
566 predictor_alias = predictor.alias.parts[0]
568 params = {}
569 if query.using is not None:
570 params = query.using
572 binary_ops = []
573 table_filters = []
574 model_filters = []
576 def split_filters(node, **kwargs):
577 # split conditions between model and table
579 if isinstance(node, BinaryOperation):
580 op = node.op.lower()
582 binary_ops.append(op)
584 if op in ["and", "or"]:
585 return
587 arg1, arg2 = node.args
588 if not isinstance(arg1, Identifier):
589 arg1, arg2 = arg2, arg1
591 if isinstance(arg1, Identifier) and isinstance(arg2, (Constant, Parameter)) and len(arg1.parts) > 1:
592 model = Identifier(parts=arg1.parts[:-1])
594 if self.is_predictor(model) or (len(model.parts) == 1 and model.parts[0] == predictor_alias):
595 model_filters.append(node)
596 return
597 table_filters.append(node)
599 # find subselects
600 main_integration, _ = self.resolve_database_table(table)
601 find_selects = self.get_nested_selects_plan_fnc(main_integration, force=True)
602 query_traversal(int_select.where, find_selects)
604 # split conditions
605 query_traversal(int_select.where, split_filters)
607 if len(model_filters) > 0 and "or" not in binary_ops:
608 int_select.where = filters_to_bin_op(table_filters)
610 integration_select_step = self.plan_integration_select(int_select)
612 predictor_identifier = utils.get_predictor_name_identifier(predictor)
614 if len(params) == 0:
615 params = None
617 row_dict = None
618 if model_filters:
619 row_dict = {}
620 for el in model_filters:
621 if isinstance(el.args[0], Identifier) and el.op == "=":
622 if isinstance(el.args[1], (Constant, Parameter)):
623 row_dict[el.args[0].parts[-1]] = el.args[1].value
625 last_step = self.plan.add_step(
626 ApplyPredictorStep(
627 namespace=predictor_namespace,
628 dataframe=integration_select_step.result,
629 predictor=predictor_identifier,
630 params=params,
631 row_dict=row_dict,
632 )
633 )
635 return {
636 "predictor": last_step,
637 "data": integration_select_step,
638 }
640 # def plan_group(self, query, last_step):
641 # # ! is not using yet
642 #
643 # # check group
644 # funcs = []
645 # for t in query.targets:
646 # if isinstance(t, Function):
647 # funcs.append(t.op.lower())
648 # agg_funcs = ['sum', 'min', 'max', 'avg', 'count', 'std']
649 #
650 # if (
651 # query.having is not None
652 # or query.group_by is not None
653 # or set(agg_funcs) & set(funcs)
654 # ):
655 # # is aggregate
656 # group_by_targets = []
657 # for t in query.targets:
658 # target_copy = copy.deepcopy(t)
659 # group_by_targets.append(target_copy)
660 # # last_step = self.plan.steps[-1]
661 # return GroupByStep(dataframe=last_step.result, columns=query.group_by, targets=group_by_targets)
663 def plan_project(self, query, dataframe, ignore_doubles=False):
664 out_identifiers = []
666 if len(query.targets) == 1 and isinstance(query.targets[0], Star):
667 last_step = self.plan.steps[-1]
668 return last_step
670 for target in query.targets:
671 if ( 671 ↛ 680line 671 didn't jump to line 680 because the condition on line 671 was always true
672 isinstance(target, Identifier)
673 or isinstance(target, Star)
674 or isinstance(target, Function)
675 or isinstance(target, Constant)
676 or isinstance(target, BinaryOperation)
677 ):
678 out_identifiers.append(target)
679 else:
680 new_identifier = Identifier(str(target.to_string(alias=False)), alias=target.alias)
681 out_identifiers.append(new_identifier)
682 return self.plan.add_step(
683 ProjectStep(dataframe=dataframe, columns=out_identifiers, ignore_doubles=ignore_doubles)
684 )
686 def plan_create_table(self, query: CreateTable):
687 if query.from_select is None:
688 if query.columns is not None: 688 ↛ 698line 688 didn't jump to line 698 because the condition on line 688 was always true
689 self.plan.add_step(
690 CreateTableStep(
691 table=query.name,
692 columns=query.columns,
693 is_replace=query.is_replace,
694 )
695 )
696 return
698 raise PlanningException(f'Not implemented "create table": {query.to_string()}')
700 integration_name = query.name.parts[0]
702 last_step = self.plan_select(query.from_select, integration=integration_name)
704 # create table step
705 self.plan.add_step(
706 SaveToTable(
707 table=query.name,
708 dataframe=last_step.result,
709 is_replace=query.is_replace,
710 )
711 )
713 def plan_insert(self, query):
714 table = query.table
715 if query.from_select is not None: 715 ↛ 737line 715 didn't jump to line 737 because the condition on line 715 was always true
716 integration_name = query.table.parts[0]
718 # plan sub-select first
719 last_step = self.plan_select(query.from_select, integration=integration_name)
721 # possible knowledge base parameters
722 select = query.from_select
723 params = {}
724 if isinstance(select, Select) and select.using is not None:
725 for k, v in select.using.items(): 725 ↛ 726line 725 didn't jump to line 726 because the loop on line 725 never started
726 if k.startswith("kb_"):
727 params[k] = v
729 self.plan.add_step(
730 InsertToTable(
731 table=table,
732 dataframe=last_step.result,
733 params=params,
734 )
735 )
736 else:
737 self.plan.add_step(
738 InsertToTable(
739 table=table,
740 query=query,
741 )
742 )
744 def plan_update(self, query):
745 last_step = None
746 if query.from_select is not None:
747 integration_name = query.table.parts[0]
748 last_step = self.plan_select(query.from_select, integration=integration_name)
750 # plan sub-select first
751 update_command = copy.deepcopy(query)
752 # clear subselect
753 update_command.from_select = None
755 table = query.table
756 self.plan.add_step(UpdateToTable(table=table, dataframe=last_step, update_command=update_command))
758 def plan_delete(self, query: Delete):
759 # find subselects
760 main_integration, _ = self.resolve_database_table(query.table)
762 is_api_db = self.integrations.get(main_integration, {}).get("class_type") == "api"
764 find_selects = self.get_nested_selects_plan_fnc(main_integration, force=is_api_db)
765 query_traversal(query.where, find_selects)
767 self.prepare_integration_select(main_integration, query.where)
769 return self.plan.add_step(DeleteStep(table=query.table, where=query.where))
771 def plan_cte(self, query):
772 for cte in query.cte:
773 step = self.plan_select(cte.query)
774 name = _resolve_identifier_part(cte.name)
775 self.cte_results[name] = step.result
777 def check_single_integration(self, query):
778 query_info = self.get_query_info(query)
780 # can we send all query to integration?
782 # one integration and not mindsdb objects in query
783 if (
784 len(query_info["mdb_entities"]) == 0
785 and len(query_info["integrations"]) == 1
786 and "files" not in query_info["integrations"]
787 and "views" not in query_info["integrations"]
788 and len(query_info["user_functions"]) == 0
789 ):
790 int_name = list(query_info["integrations"])[0]
791 # if is sql database
792 if self.integrations.get(int_name, {}).get("class_type") != "api":
793 # send to this integration
794 self.prepare_integration_select(int_name, query)
796 last_step = self.plan.add_step(FetchDataframeStep(integration=int_name, query=query))
797 return last_step
799 def plan_select(self, query, integration=None):
800 if isinstance(query, (Union, Except, Intersect)):
801 return self.plan_union(query, integration=integration)
803 if query.cte is not None:
804 self.plan_cte(query)
806 from_table = query.from_table
808 if isinstance(from_table, Identifier):
809 return self.plan_select_identifier(query)
810 elif isinstance(from_table, Select):
811 return self.plan_nested_select(query)
812 elif isinstance(from_table, Join):
813 plan_join = PlanJoin(self)
814 return plan_join.plan(query, integration)
815 elif isinstance(from_table, NativeQuery):
816 integration = from_table.integration.parts[0].lower()
817 step = FetchDataframeStep(integration=integration, raw_query=from_table.query)
818 last_step = self.plan.add_step(step)
819 return self.plan_sub_select(query, last_step)
821 elif isinstance(from_table, ast.Data):
822 step = DataStep(from_table.data)
823 last_step = self.plan.add_step(step)
824 return self.plan_sub_select(query, last_step, add_absent_cols=True)
825 elif from_table is None: 825 ↛ 830line 825 didn't jump to line 830 because the condition on line 825 was always true
826 # one line select
827 step = QueryStep(query, from_table=pd.DataFrame([None]))
828 return self.plan.add_step(step)
829 else:
830 raise PlanningException(f"Unsupported from_table {type(from_table)}")
832 def plan_sub_select(self, query, prev_step, add_absent_cols=False):
833 if (
834 query.group_by is not None
835 or query.order_by is not None
836 or query.having is not None
837 or query.distinct is True
838 or query.where is not None
839 or query.limit is not None
840 or query.offset is not None
841 or len(query.targets) != 1
842 or not isinstance(query.targets[0], Star)
843 ):
844 if query.from_table.alias is not None:
845 table_name = query.from_table.alias.parts[-1]
846 elif isinstance(query.from_table, Identifier):
847 table_name = query.from_table.parts[-1]
848 else:
849 table_name = None
851 query2 = copy.deepcopy(query)
852 query2.from_table = None
853 sup_select = SubSelectStep(query2, prev_step.result, table_name=table_name, add_absent_cols=add_absent_cols)
854 self.plan.add_step(sup_select)
855 return sup_select
856 return prev_step
858 def plan_union(self, query, integration=None):
859 step1 = self.plan_select(query.left, integration=integration)
860 step2 = self.plan_select(query.right, integration=integration)
861 operation = "union"
862 if isinstance(query, Except): 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true
863 operation = "except"
864 elif isinstance(query, Intersect): 864 ↛ 865line 864 didn't jump to line 865 because the condition on line 864 was never true
865 operation = "intersect"
867 return self.plan.add_step(
868 UnionStep(left=step1.result, right=step2.result, unique=query.unique, operation=operation)
869 )
871 # method for compatibility
872 def from_query(self, query=None):
873 self.plan = QueryPlan()
875 if query is None:
876 query = self.query
878 if isinstance(query, (Select, Union, Except, Intersect)):
879 if self.check_single_integration(query):
880 return self.plan
881 self.plan_select(query)
882 elif isinstance(query, CreateTable):
883 self.plan_create_table(query)
884 elif isinstance(query, Insert):
885 self.plan_insert(query)
886 elif isinstance(query, Update):
887 self.plan_update(query)
888 elif isinstance(query, Delete): 888 ↛ 891line 888 didn't jump to line 891 because the condition on line 888 was always true
889 self.plan_delete(query)
890 else:
891 raise PlanningException(f"Unsupported query type {type(query)}")
893 plan = self.handle_partitioning(self.plan)
895 return plan
897 def handle_partitioning(self, plan: QueryPlan) -> QueryPlan:
898 """
899 If plan has fetching in partitions:
900 try to rebuild plan to send fetched chunk of data through the following steps, if it is possible
901 """
903 # handle fetchdataframe partitioning
904 steps_in = plan.steps
905 steps_out = []
907 step = None
908 partition_step = None
909 for step in steps_in:
910 if isinstance(step, FetchDataframeStep) and step.params is not None:
911 batch_size = step.params.get("batch_size")
912 if batch_size is not None:
913 # found batched fetch
914 partition_step = FetchDataframeStepPartition(
915 step_num=step.step_num,
916 integration=step.integration,
917 query=step.query,
918 raw_query=step.raw_query,
919 params=step.params,
920 )
921 steps_out.append(partition_step)
922 # mark plan
923 plan.is_resumable = True
924 continue
925 else:
926 step.params = None
928 if partition_step is not None:
929 # check and add step into partition
931 can_be_partitioned = False
932 if isinstance(step, (JoinStep, ApplyPredictorStep, InsertToTable)):
933 can_be_partitioned = True
934 elif isinstance(step, QueryStep): 934 ↛ 949line 934 didn't jump to line 949 because the condition on line 934 was always true
935 query = step.query
936 if ( 936 ↛ 949line 936 didn't jump to line 949 because the condition on line 936 was always true
937 query.group_by is None
938 and query.order_by is None
939 and query.distinct is False
940 and query.limit is None
941 and query.offset is None
942 ):
943 no_identifiers = [
944 target for target in step.query.targets if not isinstance(target, (Star, Identifier))
945 ]
946 if len(no_identifiers) == 0: 946 ↛ 949line 946 didn't jump to line 949 because the condition on line 946 was always true
947 can_be_partitioned = True
949 if not can_be_partitioned: 949 ↛ 950line 949 didn't jump to line 950 because the condition on line 949 was never true
950 if len(partition_step.steps) == 0:
951 # Nothing can be partitioned, failback to old plan
952 plan.is_resumable = False
953 return plan
954 partition_step = None
955 else:
956 partition_step.steps.append(step)
957 continue
959 steps_out.append(step)
961 if plan.is_resumable and isinstance(step, InsertToTable):
962 plan.is_async = True
963 else:
964 # special case: register insert from select (it is the same as mark resumable)
965 if (
966 len(steps_in) == 2
967 and isinstance(steps_in[0], FetchDataframeStep)
968 and isinstance(steps_in[1], InsertToTable)
969 ):
970 plan.is_resumable = True
972 plan.steps = steps_out
973 return plan
975 def prepare_steps(self, query):
976 statement_planner = PreparedStatementPlanner(self)
978 # return generator
979 return statement_planner.prepare_steps(query)
981 def execute_steps(self, params=None):
982 statement_planner = PreparedStatementPlanner(self)
984 # return generator
985 return statement_planner.execute_steps(params)
987 # def fetch(self, row_count):
988 # statement_planner = PreparedStatementPlanner(self)
989 # return statement_planner.fetch(row_count)
990 #
991 # def close(self):
992 # statement_planner = PreparedStatementPlanner(self)
993 # return statement_planner.close()
995 def get_statement_info(self):
996 statement_planner = PreparedStatementPlanner(self)
998 return statement_planner.get_statement_info()