Coverage for mindsdb / api / executor / planner / query_planner.py: 84%

504 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import copy 

2 

3import pandas as pd 

4 

5from mindsdb_sql_parser import ast 

6from mindsdb_sql_parser.ast import ( 

7 Select, 

8 Identifier, 

9 Join, 

10 Star, 

11 BinaryOperation, 

12 Constant, 

13 Union, 

14 CreateTable, 

15 Function, 

16 Insert, 

17 Except, 

18 Intersect, 

19 Update, 

20 NativeQuery, 

21 Parameter, 

22 Delete, 

23) 

24 

25from mindsdb.api.executor.planner.exceptions import PlanningException 

26from mindsdb.api.executor.planner import utils 

27from mindsdb.api.executor.planner.query_plan import QueryPlan 

28from mindsdb.api.executor.planner.steps import ( 

29 PlanStep, 

30 FetchDataframeStep, 

31 ProjectStep, 

32 ApplyPredictorStep, 

33 ApplyPredictorRowStep, 

34 UnionStep, 

35 GetPredictorColumns, 

36 SaveToTable, 

37 InsertToTable, 

38 UpdateToTable, 

39 SubSelectStep, 

40 QueryStep, 

41 JoinStep, 

42 DeleteStep, 

43 DataStep, 

44 CreateTableStep, 

45 FetchDataframeStepPartition, 

46) 

47from mindsdb.api.executor.planner.utils import ( 

48 disambiguate_predictor_column_identifier, 

49 recursively_extract_column_values, 

50 query_traversal, 

51 filters_to_bin_op, 

52) 

53from mindsdb.api.executor.planner.plan_join import PlanJoin 

54from mindsdb.api.executor.planner.query_prepare import PreparedStatementPlanner 

55from mindsdb.utilities.config import config 

56 

57 

58default_project = config.get("default_project") 

59 

60# This includes built-in MindsDB SQL functions and functions to be executed via DuckDB consistently. 

61MINDSDB_SQL_FUNCTIONS = {"llm", "to_markdown", "hash"} 

62 

63 

64def _resolve_identifier_part(identifier: Identifier, part: int = -1) -> str: 

65 """Resolve a part of an identifier. 

66 

67 Args: 

68 identifier (Identifier): The identifier to resolve the part of. 

69 part (int): The part number to resolve. 

70 

71 Returns: 

72 str: part of the identifier in lowercase if not quoted, otherwise the part itself. 

73 """ 

74 name = identifier.parts[part] 

75 is_quoted = identifier.is_quoted[part] 

76 if not is_quoted: 76 ↛ 78line 76 didn't jump to line 78 because the condition on line 76 was always true

77 name = name.lower() 

78 return name 

79 

80 

81class QueryPlanner: 

82 def __init__( 

83 self, 

84 query=None, 

85 integrations: list = None, 

86 predictor_namespace=None, 

87 predictor_metadata: list = None, 

88 default_namespace: str = None, 

89 ): 

90 self.query = query 

91 self.plan = QueryPlan() 

92 

93 _projects = set() 

94 self.integrations = {} 

95 if integrations is not None: 

96 for integration in integrations: 

97 if isinstance(integration, dict): 

98 integration_name = integration["name"] 

99 # it is project of system database 

100 if integration["type"] != "data": 

101 _projects.add(integration_name) 

102 continue 

103 else: 

104 integration_name = integration 

105 integration = {"name": integration} 

106 self.integrations[integration_name] = integration 

107 

108 # allow to select from mindsdb namespace 

109 _projects.add(default_project) 

110 

111 self.default_namespace = default_namespace 

112 

113 # legacy parameter 

114 self.predictor_namespace = predictor_namespace.lower() if predictor_namespace else default_project 

115 

116 # map for lower names of predictors 

117 

118 self.predictor_info = {} 

119 if isinstance(predictor_metadata, list): 

120 # convert to dict 

121 for predictor in predictor_metadata: 

122 if "integration_name" in predictor: 122 ↛ 125line 122 didn't jump to line 125 because the condition on line 122 was always true

123 integration_name = predictor["integration_name"] 

124 else: 

125 integration_name = self.predictor_namespace 

126 predictor["integration_name"] = integration_name 

127 idx = f"{integration_name}.{predictor['name']}".lower() 

128 self.predictor_info[idx] = predictor 

129 _projects.add(integration_name.lower()) 

130 elif isinstance(predictor_metadata, dict): 

131 # legacy behaviour 

132 for name, predictor in predictor_metadata.items(): 

133 if "." not in name: 133 ↛ 142line 133 didn't jump to line 142 because the condition on line 133 was always true

134 if "integration_name" in predictor: 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 integration_name = predictor["integration_name"] 

136 else: 

137 integration_name = self.predictor_namespace 

138 predictor["integration_name"] = integration_name 

139 name = f"{integration_name}.{name}".lower() 

140 _projects.add(integration_name.lower()) 

141 

142 self.predictor_info[name] = predictor 

143 

144 self.projects = list(_projects) 

145 self.databases = list(self.integrations.keys()) + self.projects 

146 

147 self.statement = None 

148 

149 self.cte_results = {} 

150 

151 def is_predictor(self, identifier): 

152 if not isinstance(identifier, Identifier): 

153 return False 

154 return self.get_predictor(identifier) is not None 

155 

156 def get_predictor(self, identifier): 

157 name_parts = list(identifier.parts) 

158 

159 version = None 

160 if len(name_parts) > 1 and name_parts[-1].isdigit(): 

161 # last part is version 

162 version = name_parts[-1] 

163 name_parts = name_parts[:-1] 

164 

165 name = name_parts[-1] 

166 

167 namespace = None 

168 if len(name_parts) > 1: 

169 namespace = name_parts[-2] 

170 else: 

171 if self.default_namespace is not None: 

172 namespace = self.default_namespace 

173 

174 idx_ar = [name] 

175 if namespace is not None: 

176 idx_ar.insert(0, namespace) 

177 

178 idx = ".".join(idx_ar).lower() 

179 info = self.predictor_info.get(idx) 

180 if info is not None: 

181 info["version"] = version 

182 info["name"] = name 

183 return info 

184 

185 def prepare_integration_select(self, database, query): 

186 # replacement for 'utils.recursively_disambiguate_*' functions from utils 

187 # main purpose: make tests working (don't change planner outputs) 

188 # can be removed in future (with adapting the tests) except 'cut integration part' block 

189 

190 def _prepare_integration_select(node, is_table, is_target, parent_query, **kwargs): 

191 if not isinstance(node, Identifier): 

192 return 

193 

194 # cut integration part 

195 if len(node.parts) > 1: 

196 if (node.is_quoted[0] and node.parts[0] == database) or ( 

197 not node.is_quoted[0] and node.parts[0].lower() == database.lower() 

198 ): 

199 node.parts.pop(0) 

200 node.is_quoted.pop(0) 

201 

202 if not hasattr(parent_query, "from_table"): 

203 return 

204 

205 table = parent_query.from_table 

206 if not is_table: 

207 # add table name or alias for identifiers 

208 if isinstance(table, Join): 

209 # skip for join 

210 return 

211 

212 # keep column name for target 

213 if is_target: 

214 if node.alias is None: 

215 last_part = node.parts[-1] 

216 if isinstance(last_part, str): 216 ↛ exitline 216 didn't return from function '_prepare_integration_select' because the condition on line 216 was always true

217 node.alias = Identifier(parts=[node.parts[-1]]) 

218 

219 query_traversal(query, _prepare_integration_select) 

220 

221 def get_integration_select_step(self, select: Select, params: dict = None) -> PlanStep: 

222 """ 

223 Generate planner step to execute query over integration or over results of previous step (if it is CTE) 

224 """ 

225 

226 if isinstance(select.from_table, NativeQuery): 

227 integration_name = select.from_table.integration.parts[-1] 

228 else: 

229 integration_name, table = self.resolve_database_table(select.from_table) 

230 

231 # is it CTE? 

232 table_name = table_alias = _resolve_identifier_part(table) 

233 if table.alias is not None: 

234 table_alias = _resolve_identifier_part(table.alias) 

235 

236 if integration_name == self.default_namespace and table_name in self.cte_results: 

237 select.from_table = None 

238 return SubSelectStep(select, self.cte_results[table_name], table_name=table_alias) 

239 

240 fetch_df_select = copy.deepcopy(select) 

241 self.prepare_integration_select(integration_name, fetch_df_select) 

242 

243 # remove predictor params 

244 if fetch_df_select.using is not None: 

245 fetch_df_select.using = None 

246 fetch_params = self.get_fetch_params(params) 

247 return FetchDataframeStep(integration=integration_name, query=fetch_df_select, params=fetch_params) 

248 

249 def get_fetch_params(self, params): 

250 # extracts parameters for fetching 

251 

252 if params: 

253 fetch_params = params.copy() 

254 # remove partition parameters 

255 for key in ("batch_size", "track_column"): 

256 if key in params: 

257 del params[key] 

258 if "track_column" in fetch_params and isinstance(fetch_params["track_column"], Identifier): 

259 fetch_params["track_column"] = fetch_params["track_column"].parts[-1] 

260 else: 

261 fetch_params = None 

262 return fetch_params 

263 

264 def plan_integration_select(self, select): 

265 """Plan for a select query that can be fully executed in an integration""" 

266 

267 return self.plan.add_step(self.get_integration_select_step(select, params=select.using)) 

268 

269 def resolve_database_table(self, node: Identifier): 

270 # resolves integration name and table name 

271 

272 parts = node.parts.copy() 

273 alias = None 

274 if node.alias is not None: 

275 alias = node.alias.copy() 

276 

277 database = self.default_namespace 

278 

279 err_msg_suffix = "" 

280 if len(parts) > 1: 

281 # if not quoted check in lower case 

282 part = parts[0] 

283 if part not in self.databases and not node.is_quoted[0]: 

284 part = part.lower() 

285 

286 if part in self.databases: 

287 database = part 

288 parts.pop(0) 

289 else: 

290 err_msg_suffix = f"'{parts[0].lower()}' is not valid database name." 

291 

292 if database is None: 

293 raise PlanningException( 

294 f"Invalid or missing database name for identifier '{node}'. {err_msg_suffix}\n" 

295 "Query must include a valid database name prefix in format: 'database_name.table_name' or 'database_name.schema_name.table_name'" 

296 ) 

297 

298 return database, Identifier(parts=parts, alias=alias) 

299 

300 def get_query_info(self, query): 

301 # get all predictors 

302 mdb_entities = [] 

303 predictors = [] 

304 user_functions = [] 

305 # projects = set() 

306 integrations = set() 

307 

308 def find_objects(node, is_table, **kwargs): 

309 if isinstance(node, Function): 

310 if node.namespace is not None or node.op.lower() in MINDSDB_SQL_FUNCTIONS: 

311 user_functions.append(node) 

312 

313 if is_table: 

314 if isinstance(node, ast.Identifier): 

315 integration, _ = self.resolve_database_table(node) 

316 

317 if self.is_predictor(node): 

318 predictors.append(node) 

319 

320 if integration in self.projects: 

321 # it is project 

322 mdb_entities.append(node) 

323 

324 elif integration is not None: 324 ↛ 326line 324 didn't jump to line 326 because the condition on line 324 was always true

325 integrations.add(integration) 

326 if isinstance(node, ast.NativeQuery) or isinstance(node, ast.Data): 

327 mdb_entities.append(node) 

328 

329 query_traversal(query, find_objects) 

330 

331 # cte names are not mdb objects 

332 if isinstance(query, Select) and query.cte: 

333 cte_names = [cte.name.parts[-1] for cte in query.cte] 

334 mdb_entities = [item for item in mdb_entities if ".".join(item.parts) not in cte_names] 

335 

336 return { 

337 "mdb_entities": mdb_entities, 

338 "integrations": integrations, 

339 "predictors": predictors, 

340 "user_functions": user_functions, 

341 } 

342 

343 def get_nested_selects_plan_fnc(self, main_integration, force=False): 

344 # returns function for traversal over query and inject fetch data query instead of subselects 

345 def find_selects(node, **kwargs): 

346 if isinstance(node, Select): 

347 query_info2 = self.get_query_info(node) 

348 if force or ( 

349 len(query_info2["integrations"]) > 1 

350 or main_integration not in query_info2["integrations"] 

351 or len(query_info2["mdb_entities"]) > 0 

352 ): 

353 # need to execute in planner 

354 

355 node.parentheses = False 

356 last_step = self.plan_select(node) 

357 

358 node2 = Parameter(last_step.result) 

359 

360 return node2 

361 

362 return find_selects 

363 

364 def plan_select_identifier(self, query): 

365 # query_info = self.get_query_info(query) 

366 # 

367 # if len(query_info['integrations']) == 0 and len(query_info['predictors']) >= 1: 

368 # # select from predictor 

369 # return self.plan_select_from_predictor(query) 

370 # elif ( 

371 # len(query_info['integrations']) == 1 

372 # and len(query_info['mdb_entities']) == 0 

373 # and len(query_info['user_functions']) == 0 

374 # ): 

375 # 

376 # int_name = list(query_info['integrations'])[0] 

377 # if self.integrations.get(int_name, {}).get('class_type') != 'api': 

378 # # one integration without predictors, send all query to integration 

379 # return self.plan_integration_select(query) 

380 

381 # find subselects 

382 main_integration, _ = self.resolve_database_table(query.from_table) 

383 is_api_db = self.integrations.get(main_integration, {}).get("class_type") == "api" 

384 

385 find_selects = self.get_nested_selects_plan_fnc(main_integration, force=is_api_db) 

386 query.targets = query_traversal(query.targets, find_selects) 

387 query_traversal(query.where, find_selects) 

388 

389 # get info of updated query 

390 query_info = self.get_query_info(query) 

391 

392 if len(query_info["predictors"]) >= 1: 

393 # select from predictor 

394 return self.plan_select_from_predictor(query) 

395 elif is_api_db: 

396 return self.plan_api_db_select(query) 

397 elif len(query_info["user_functions"]) > 0: 

398 return self.plan_integration_select_with_functions(query) 

399 else: 

400 # fallback to integration 

401 return self.plan_integration_select(query) 

402 

403 def plan_integration_select_with_functions(self, query): 

404 # UDF can't be aggregate function: it means we have to do aggregation after function execution 

405 # - remove targets from query 

406 # - add subselect with targets 

407 

408 # replace functions in conditions 

409 

410 query2 = query.copy() 

411 

412 skipped_conditions = [] 

413 

414 def replace_functions(node, **kwargs): 

415 if not isinstance(node, BinaryOperation): 

416 return 

417 

418 arg1, arg2 = node.args 

419 if not isinstance(arg1, Function): 419 ↛ 421line 419 didn't jump to line 421 because the condition on line 419 was always true

420 arg1 = arg2 

421 if not isinstance(arg1, Function): 421 ↛ 422line 421 didn't jump to line 422 because the condition on line 421 was never true

422 return 

423 

424 # user defined 

425 if arg1.namespace is not None: 425 ↛ exitline 425 didn't return from function 'replace_functions' because the condition on line 425 was always true

426 # clear 

427 skipped_conditions.append(node) 

428 node.args = [Constant(0), Constant(0)] 

429 node.op = "=" 

430 

431 query_traversal(query2.where, replace_functions) 

432 

433 query2.targets = [Star()] 

434 

435 # don't do aggregate 

436 query2.having = None 

437 

438 if query.group_by is not None: 438 ↛ 440line 438 didn't jump to line 440 because the condition on line 438 was never true

439 # if aggregation exists, do order and limit in subquery 

440 query2.group_by = None 

441 query2.order_by = None 

442 query2.limit = None 

443 else: 

444 query.order_by = None 

445 query.limit = None 

446 

447 # if all conditions were executed - clear it 

448 if len(skipped_conditions) == 0: 

449 query.where = None 

450 

451 prev_step = self.plan_integration_select(query2) 

452 

453 return self.plan_sub_select(query, prev_step) 

454 

455 def plan_api_db_select(self, query): 

456 # split to select from api database 

457 # keep only limit and where 

458 # the rest goes to outer select 

459 query2 = Select( 

460 targets=query.targets, 

461 from_table=query.from_table, 

462 where=query.where, 

463 order_by=query.order_by, 

464 limit=query.limit, 

465 ) 

466 prev_step = self.plan_integration_select(query2) 

467 

468 # clear limit and where 

469 query.limit = None 

470 query.where = None 

471 return self.plan_sub_select(query, prev_step) 

472 

473 def plan_nested_select(self, select): 

474 # query_info = self.get_query_info(select) 

475 # # get all predictors 

476 # 

477 # if ( 

478 # len(query_info['mdb_entities']) == 0 

479 # and len(query_info['integrations']) == 1 

480 # and len(query_info['user_functions']) == 0 

481 # and 'files' not in query_info['integrations'] 

482 # and 'views' not in query_info['integrations'] 

483 # ): 

484 # int_name = list(query_info['integrations'])[0] 

485 # if self.integrations.get(int_name, {}).get('class_type') != 'api': 

486 # 

487 # # if no predictor inside = run as is 

488 # return self.plan_integration_nested_select(select, int_name) 

489 

490 return self.plan_mdb_nested_select(select) 

491 

492 def plan_mdb_nested_select(self, select): 

493 # plan nested select 

494 

495 select2 = copy.deepcopy(select.from_table) 

496 select2.parentheses = False 

497 select2.alias = None 

498 self.plan_select(select2) 

499 last_step = self.plan.steps[-1] 

500 

501 return self.plan_sub_select(select, last_step) 

502 

503 def get_predictor_namespace_and_name_from_identifier(self, identifier): 

504 new_identifier = copy.deepcopy(identifier) 

505 

506 info = self.get_predictor(identifier) 

507 namespace = info["integration_name"] 

508 

509 parts = [namespace, info["name"]] 

510 if info["version"] is not None: 

511 parts.append(info["version"]) 

512 new_identifier.parts = parts 

513 

514 return namespace, new_identifier 

515 

516 def plan_select_from_predictor(self, select): 

517 predictor_namespace, predictor = self.get_predictor_namespace_and_name_from_identifier(select.from_table) 

518 

519 if select.where == BinaryOperation("=", args=[Constant(1), Constant(0)]): 

520 # Hardcoded mysql way of getting predictor columns 

521 predictor_identifier = utils.get_predictor_name_identifier(predictor) 

522 predictor_step = self.plan.add_step( 

523 GetPredictorColumns(namespace=predictor_namespace, predictor=predictor_identifier) 

524 ) 

525 else: 

526 new_query_targets = [] 

527 for target in select.targets: 

528 if isinstance(target, Identifier): 

529 new_query_targets.append(disambiguate_predictor_column_identifier(target, predictor)) 

530 elif type(target) in (Star, Constant, Function): 530 ↛ 533line 530 didn't jump to line 533 because the condition on line 530 was always true

531 new_query_targets.append(target) 

532 else: 

533 raise PlanningException(f"Unknown select target {type(target)}") 

534 

535 if select.group_by or select.having: 

536 raise PlanningException( 

537 "Unsupported operation when querying predictor. Only WHERE is allowed and required." 

538 ) 

539 

540 row_dict = {} 

541 where_clause = select.where 

542 if not where_clause: 

543 raise PlanningException("WHERE clause required when selecting from predictor") 

544 

545 predictor_identifier = utils.get_predictor_name_identifier(predictor) 

546 recursively_extract_column_values(where_clause, row_dict, predictor_identifier) 

547 

548 params = None 

549 if select.using is not None: 

550 params = select.using 

551 predictor_step = self.plan.add_step( 

552 ApplyPredictorRowStep( 

553 namespace=predictor_namespace, predictor=predictor_identifier, row_dict=row_dict, params=params 

554 ) 

555 ) 

556 project_step = self.plan_project(select, predictor_step.result) 

557 return project_step 

558 

559 def plan_predictor(self, query, table, predictor_namespace, predictor): 

560 int_select = copy.deepcopy(query) 

561 int_select.targets = [Star()] # TODO why not query.targets? 

562 int_select.from_table = table 

563 

564 predictor_alias = None 

565 if predictor.alias is not None: 

566 predictor_alias = predictor.alias.parts[0] 

567 

568 params = {} 

569 if query.using is not None: 

570 params = query.using 

571 

572 binary_ops = [] 

573 table_filters = [] 

574 model_filters = [] 

575 

576 def split_filters(node, **kwargs): 

577 # split conditions between model and table 

578 

579 if isinstance(node, BinaryOperation): 

580 op = node.op.lower() 

581 

582 binary_ops.append(op) 

583 

584 if op in ["and", "or"]: 

585 return 

586 

587 arg1, arg2 = node.args 

588 if not isinstance(arg1, Identifier): 

589 arg1, arg2 = arg2, arg1 

590 

591 if isinstance(arg1, Identifier) and isinstance(arg2, (Constant, Parameter)) and len(arg1.parts) > 1: 

592 model = Identifier(parts=arg1.parts[:-1]) 

593 

594 if self.is_predictor(model) or (len(model.parts) == 1 and model.parts[0] == predictor_alias): 

595 model_filters.append(node) 

596 return 

597 table_filters.append(node) 

598 

599 # find subselects 

600 main_integration, _ = self.resolve_database_table(table) 

601 find_selects = self.get_nested_selects_plan_fnc(main_integration, force=True) 

602 query_traversal(int_select.where, find_selects) 

603 

604 # split conditions 

605 query_traversal(int_select.where, split_filters) 

606 

607 if len(model_filters) > 0 and "or" not in binary_ops: 

608 int_select.where = filters_to_bin_op(table_filters) 

609 

610 integration_select_step = self.plan_integration_select(int_select) 

611 

612 predictor_identifier = utils.get_predictor_name_identifier(predictor) 

613 

614 if len(params) == 0: 

615 params = None 

616 

617 row_dict = None 

618 if model_filters: 

619 row_dict = {} 

620 for el in model_filters: 

621 if isinstance(el.args[0], Identifier) and el.op == "=": 

622 if isinstance(el.args[1], (Constant, Parameter)): 

623 row_dict[el.args[0].parts[-1]] = el.args[1].value 

624 

625 last_step = self.plan.add_step( 

626 ApplyPredictorStep( 

627 namespace=predictor_namespace, 

628 dataframe=integration_select_step.result, 

629 predictor=predictor_identifier, 

630 params=params, 

631 row_dict=row_dict, 

632 ) 

633 ) 

634 

635 return { 

636 "predictor": last_step, 

637 "data": integration_select_step, 

638 } 

639 

640 # def plan_group(self, query, last_step): 

641 # # ! is not using yet 

642 # 

643 # # check group 

644 # funcs = [] 

645 # for t in query.targets: 

646 # if isinstance(t, Function): 

647 # funcs.append(t.op.lower()) 

648 # agg_funcs = ['sum', 'min', 'max', 'avg', 'count', 'std'] 

649 # 

650 # if ( 

651 # query.having is not None 

652 # or query.group_by is not None 

653 # or set(agg_funcs) & set(funcs) 

654 # ): 

655 # # is aggregate 

656 # group_by_targets = [] 

657 # for t in query.targets: 

658 # target_copy = copy.deepcopy(t) 

659 # group_by_targets.append(target_copy) 

660 # # last_step = self.plan.steps[-1] 

661 # return GroupByStep(dataframe=last_step.result, columns=query.group_by, targets=group_by_targets) 

662 

663 def plan_project(self, query, dataframe, ignore_doubles=False): 

664 out_identifiers = [] 

665 

666 if len(query.targets) == 1 and isinstance(query.targets[0], Star): 

667 last_step = self.plan.steps[-1] 

668 return last_step 

669 

670 for target in query.targets: 

671 if ( 671 ↛ 680line 671 didn't jump to line 680 because the condition on line 671 was always true

672 isinstance(target, Identifier) 

673 or isinstance(target, Star) 

674 or isinstance(target, Function) 

675 or isinstance(target, Constant) 

676 or isinstance(target, BinaryOperation) 

677 ): 

678 out_identifiers.append(target) 

679 else: 

680 new_identifier = Identifier(str(target.to_string(alias=False)), alias=target.alias) 

681 out_identifiers.append(new_identifier) 

682 return self.plan.add_step( 

683 ProjectStep(dataframe=dataframe, columns=out_identifiers, ignore_doubles=ignore_doubles) 

684 ) 

685 

686 def plan_create_table(self, query: CreateTable): 

687 if query.from_select is None: 

688 if query.columns is not None: 688 ↛ 698line 688 didn't jump to line 698 because the condition on line 688 was always true

689 self.plan.add_step( 

690 CreateTableStep( 

691 table=query.name, 

692 columns=query.columns, 

693 is_replace=query.is_replace, 

694 ) 

695 ) 

696 return 

697 

698 raise PlanningException(f'Not implemented "create table": {query.to_string()}') 

699 

700 integration_name = query.name.parts[0] 

701 

702 last_step = self.plan_select(query.from_select, integration=integration_name) 

703 

704 # create table step 

705 self.plan.add_step( 

706 SaveToTable( 

707 table=query.name, 

708 dataframe=last_step.result, 

709 is_replace=query.is_replace, 

710 ) 

711 ) 

712 

713 def plan_insert(self, query): 

714 table = query.table 

715 if query.from_select is not None: 715 ↛ 737line 715 didn't jump to line 737 because the condition on line 715 was always true

716 integration_name = query.table.parts[0] 

717 

718 # plan sub-select first 

719 last_step = self.plan_select(query.from_select, integration=integration_name) 

720 

721 # possible knowledge base parameters 

722 select = query.from_select 

723 params = {} 

724 if isinstance(select, Select) and select.using is not None: 

725 for k, v in select.using.items(): 725 ↛ 726line 725 didn't jump to line 726 because the loop on line 725 never started

726 if k.startswith("kb_"): 

727 params[k] = v 

728 

729 self.plan.add_step( 

730 InsertToTable( 

731 table=table, 

732 dataframe=last_step.result, 

733 params=params, 

734 ) 

735 ) 

736 else: 

737 self.plan.add_step( 

738 InsertToTable( 

739 table=table, 

740 query=query, 

741 ) 

742 ) 

743 

744 def plan_update(self, query): 

745 last_step = None 

746 if query.from_select is not None: 

747 integration_name = query.table.parts[0] 

748 last_step = self.plan_select(query.from_select, integration=integration_name) 

749 

750 # plan sub-select first 

751 update_command = copy.deepcopy(query) 

752 # clear subselect 

753 update_command.from_select = None 

754 

755 table = query.table 

756 self.plan.add_step(UpdateToTable(table=table, dataframe=last_step, update_command=update_command)) 

757 

758 def plan_delete(self, query: Delete): 

759 # find subselects 

760 main_integration, _ = self.resolve_database_table(query.table) 

761 

762 is_api_db = self.integrations.get(main_integration, {}).get("class_type") == "api" 

763 

764 find_selects = self.get_nested_selects_plan_fnc(main_integration, force=is_api_db) 

765 query_traversal(query.where, find_selects) 

766 

767 self.prepare_integration_select(main_integration, query.where) 

768 

769 return self.plan.add_step(DeleteStep(table=query.table, where=query.where)) 

770 

771 def plan_cte(self, query): 

772 for cte in query.cte: 

773 step = self.plan_select(cte.query) 

774 name = _resolve_identifier_part(cte.name) 

775 self.cte_results[name] = step.result 

776 

777 def check_single_integration(self, query): 

778 query_info = self.get_query_info(query) 

779 

780 # can we send all query to integration? 

781 

782 # one integration and not mindsdb objects in query 

783 if ( 

784 len(query_info["mdb_entities"]) == 0 

785 and len(query_info["integrations"]) == 1 

786 and "files" not in query_info["integrations"] 

787 and "views" not in query_info["integrations"] 

788 and len(query_info["user_functions"]) == 0 

789 ): 

790 int_name = list(query_info["integrations"])[0] 

791 # if is sql database 

792 if self.integrations.get(int_name, {}).get("class_type") != "api": 

793 # send to this integration 

794 self.prepare_integration_select(int_name, query) 

795 

796 last_step = self.plan.add_step(FetchDataframeStep(integration=int_name, query=query)) 

797 return last_step 

798 

799 def plan_select(self, query, integration=None): 

800 if isinstance(query, (Union, Except, Intersect)): 

801 return self.plan_union(query, integration=integration) 

802 

803 if query.cte is not None: 

804 self.plan_cte(query) 

805 

806 from_table = query.from_table 

807 

808 if isinstance(from_table, Identifier): 

809 return self.plan_select_identifier(query) 

810 elif isinstance(from_table, Select): 

811 return self.plan_nested_select(query) 

812 elif isinstance(from_table, Join): 

813 plan_join = PlanJoin(self) 

814 return plan_join.plan(query, integration) 

815 elif isinstance(from_table, NativeQuery): 

816 integration = from_table.integration.parts[0].lower() 

817 step = FetchDataframeStep(integration=integration, raw_query=from_table.query) 

818 last_step = self.plan.add_step(step) 

819 return self.plan_sub_select(query, last_step) 

820 

821 elif isinstance(from_table, ast.Data): 

822 step = DataStep(from_table.data) 

823 last_step = self.plan.add_step(step) 

824 return self.plan_sub_select(query, last_step, add_absent_cols=True) 

825 elif from_table is None: 825 ↛ 830line 825 didn't jump to line 830 because the condition on line 825 was always true

826 # one line select 

827 step = QueryStep(query, from_table=pd.DataFrame([None])) 

828 return self.plan.add_step(step) 

829 else: 

830 raise PlanningException(f"Unsupported from_table {type(from_table)}") 

831 

832 def plan_sub_select(self, query, prev_step, add_absent_cols=False): 

833 if ( 

834 query.group_by is not None 

835 or query.order_by is not None 

836 or query.having is not None 

837 or query.distinct is True 

838 or query.where is not None 

839 or query.limit is not None 

840 or query.offset is not None 

841 or len(query.targets) != 1 

842 or not isinstance(query.targets[0], Star) 

843 ): 

844 if query.from_table.alias is not None: 

845 table_name = query.from_table.alias.parts[-1] 

846 elif isinstance(query.from_table, Identifier): 

847 table_name = query.from_table.parts[-1] 

848 else: 

849 table_name = None 

850 

851 query2 = copy.deepcopy(query) 

852 query2.from_table = None 

853 sup_select = SubSelectStep(query2, prev_step.result, table_name=table_name, add_absent_cols=add_absent_cols) 

854 self.plan.add_step(sup_select) 

855 return sup_select 

856 return prev_step 

857 

858 def plan_union(self, query, integration=None): 

859 step1 = self.plan_select(query.left, integration=integration) 

860 step2 = self.plan_select(query.right, integration=integration) 

861 operation = "union" 

862 if isinstance(query, Except): 862 ↛ 863line 862 didn't jump to line 863 because the condition on line 862 was never true

863 operation = "except" 

864 elif isinstance(query, Intersect): 864 ↛ 865line 864 didn't jump to line 865 because the condition on line 864 was never true

865 operation = "intersect" 

866 

867 return self.plan.add_step( 

868 UnionStep(left=step1.result, right=step2.result, unique=query.unique, operation=operation) 

869 ) 

870 

871 # method for compatibility 

872 def from_query(self, query=None): 

873 self.plan = QueryPlan() 

874 

875 if query is None: 

876 query = self.query 

877 

878 if isinstance(query, (Select, Union, Except, Intersect)): 

879 if self.check_single_integration(query): 

880 return self.plan 

881 self.plan_select(query) 

882 elif isinstance(query, CreateTable): 

883 self.plan_create_table(query) 

884 elif isinstance(query, Insert): 

885 self.plan_insert(query) 

886 elif isinstance(query, Update): 

887 self.plan_update(query) 

888 elif isinstance(query, Delete): 888 ↛ 891line 888 didn't jump to line 891 because the condition on line 888 was always true

889 self.plan_delete(query) 

890 else: 

891 raise PlanningException(f"Unsupported query type {type(query)}") 

892 

893 plan = self.handle_partitioning(self.plan) 

894 

895 return plan 

896 

897 def handle_partitioning(self, plan: QueryPlan) -> QueryPlan: 

898 """ 

899 If plan has fetching in partitions: 

900 try to rebuild plan to send fetched chunk of data through the following steps, if it is possible 

901 """ 

902 

903 # handle fetchdataframe partitioning 

904 steps_in = plan.steps 

905 steps_out = [] 

906 

907 step = None 

908 partition_step = None 

909 for step in steps_in: 

910 if isinstance(step, FetchDataframeStep) and step.params is not None: 

911 batch_size = step.params.get("batch_size") 

912 if batch_size is not None: 

913 # found batched fetch 

914 partition_step = FetchDataframeStepPartition( 

915 step_num=step.step_num, 

916 integration=step.integration, 

917 query=step.query, 

918 raw_query=step.raw_query, 

919 params=step.params, 

920 ) 

921 steps_out.append(partition_step) 

922 # mark plan 

923 plan.is_resumable = True 

924 continue 

925 else: 

926 step.params = None 

927 

928 if partition_step is not None: 

929 # check and add step into partition 

930 

931 can_be_partitioned = False 

932 if isinstance(step, (JoinStep, ApplyPredictorStep, InsertToTable)): 

933 can_be_partitioned = True 

934 elif isinstance(step, QueryStep): 934 ↛ 949line 934 didn't jump to line 949 because the condition on line 934 was always true

935 query = step.query 

936 if ( 936 ↛ 949line 936 didn't jump to line 949 because the condition on line 936 was always true

937 query.group_by is None 

938 and query.order_by is None 

939 and query.distinct is False 

940 and query.limit is None 

941 and query.offset is None 

942 ): 

943 no_identifiers = [ 

944 target for target in step.query.targets if not isinstance(target, (Star, Identifier)) 

945 ] 

946 if len(no_identifiers) == 0: 946 ↛ 949line 946 didn't jump to line 949 because the condition on line 946 was always true

947 can_be_partitioned = True 

948 

949 if not can_be_partitioned: 949 ↛ 950line 949 didn't jump to line 950 because the condition on line 949 was never true

950 if len(partition_step.steps) == 0: 

951 # Nothing can be partitioned, failback to old plan 

952 plan.is_resumable = False 

953 return plan 

954 partition_step = None 

955 else: 

956 partition_step.steps.append(step) 

957 continue 

958 

959 steps_out.append(step) 

960 

961 if plan.is_resumable and isinstance(step, InsertToTable): 

962 plan.is_async = True 

963 else: 

964 # special case: register insert from select (it is the same as mark resumable) 

965 if ( 

966 len(steps_in) == 2 

967 and isinstance(steps_in[0], FetchDataframeStep) 

968 and isinstance(steps_in[1], InsertToTable) 

969 ): 

970 plan.is_resumable = True 

971 

972 plan.steps = steps_out 

973 return plan 

974 

975 def prepare_steps(self, query): 

976 statement_planner = PreparedStatementPlanner(self) 

977 

978 # return generator 

979 return statement_planner.prepare_steps(query) 

980 

981 def execute_steps(self, params=None): 

982 statement_planner = PreparedStatementPlanner(self) 

983 

984 # return generator 

985 return statement_planner.execute_steps(params) 

986 

987 # def fetch(self, row_count): 

988 # statement_planner = PreparedStatementPlanner(self) 

989 # return statement_planner.fetch(row_count) 

990 # 

991 # def close(self): 

992 # statement_planner = PreparedStatementPlanner(self) 

993 # return statement_planner.close() 

994 

995 def get_statement_info(self): 

996 statement_planner = PreparedStatementPlanner(self) 

997 

998 return statement_planner.get_statement_info()