Coverage for mindsdb / api / executor / planner / query_prepare.py: 71%

278 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import copy 

2from mindsdb_sql_parser import ast 

3 

4from mindsdb.api.executor.planner.exceptions import PlanningException 

5from mindsdb.api.executor.planner import steps 

6from mindsdb.api.executor.planner import utils 

7 

8 

9def to_string(identifier): 

10 # alternative to AST.to_string() but without quoting 

11 return ".".join(identifier.parts) 

12 

13 

14class Table: 

15 def __init__(self, node=None, ds=None, is_predictor=None): 

16 self.node = node 

17 self.is_predictor = is_predictor 

18 self.ds = ds 

19 self.name = to_string(node) 

20 self.columns = None 

21 # None is unknown 

22 self.columns_map = None 

23 self.keys = None 

24 if node.alias: 

25 self.alias = to_string(node.alias) 

26 else: 

27 self.alias = None 

28 # self.alias = self.name 

29 

30 

31class Column: 

32 def __init__(self, node=None, table=None, name=None, type=None): 

33 alias = None 

34 if node is not None: 34 ↛ 40line 34 didn't jump to line 40 because the condition on line 34 was always true

35 if isinstance(node, ast.Identifier): 

36 # set name 

37 name = node.parts[-1] # ??? 

38 

39 else: 

40 if table is not None and name is not None: 

41 node = ast.Identifier(parts=[table.name, name]) 

42 

43 self.node = node # link to AST 

44 self.alias = alias # for ProjectStep 

45 

46 self.is_star = False 

47 

48 self.table = table # link to AST table 

49 self.name = name # column name 

50 self.type = type 

51 

52 

53class Statement: 

54 def __init__(self): 

55 self.columns = [] 

56 # self.query = None 

57 self.params = None 

58 self.result = None 

59 

60 # Tables on first level of select 

61 self.tables_lvl1 = None 

62 

63 # mapping tables by name {'ta.ble': Table()} 

64 self.tables_map = None 

65 

66 self.offset = 0 

67 

68 

69class PreparedStatementPlanner: 

70 def __init__(self, planner): 

71 self.planner = planner 

72 

73 def get_type_of_var(self, v): 

74 if isinstance(v, str): 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 return "str" 

76 elif isinstance(v, float): 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true

77 return "float" 

78 elif isinstance(v, int): 

79 return "integer" 

80 

81 return "str" 

82 

83 def get_statement_info(self): 

84 stmt = self.planner.statement 

85 

86 if stmt is None: 

87 raise PlanningException("Statement is not prepared") 

88 

89 columns_result = [] 

90 

91 for column in stmt.columns: 

92 table, ds = None, None 

93 if column.table is not None: 

94 table = column.table.name 

95 ds = column.table.ds 

96 columns_result.append( 

97 dict( 

98 alias=column.alias, 

99 type=column.type, 

100 name=column.name, 

101 table_name=table, 

102 table_alias=table, 

103 ds=ds, 

104 ) 

105 ) 

106 

107 parameters = [] 

108 for param in stmt.params: 

109 name = "?" 

110 parameters.append( 

111 dict( 

112 alias=name, 

113 type="str", 

114 name=name, 

115 ) 

116 ) 

117 

118 return {"parameters": parameters, "columns": columns_result} 

119 

120 def get_table_of_column(self, t): 

121 tables_map = self.planner.statement.tables_map 

122 

123 # get tables to check 

124 if len(t.parts) > 1: 

125 # try to find table 

126 table_parts = t.parts[:-1] 

127 table_name = ".".join(table_parts) 

128 if table_name in tables_map: 

129 return tables_map[table_name] 

130 

131 elif len(table_parts) > 1: 131 ↛ exitline 131 didn't return from function 'get_table_of_column' because the condition on line 131 was always true

132 # maybe datasource is 1st part 

133 table_parts = table_parts[1:] 

134 table_name = ".".join(table_parts) 

135 if table_name in tables_map: 135 ↛ exitline 135 didn't return from function 'get_table_of_column' because the condition on line 135 was always true

136 return tables_map[table_name] 

137 

138 def table_from_identifier(self, table): 

139 # disambiguate 

140 if self.planner.is_predictor(table): 

141 ds, table = self.planner.get_predictor_namespace_and_name_from_identifier(table) 

142 is_predictor = True 

143 

144 else: 

145 ds, table = self.planner.resolve_database_table(table) 

146 is_predictor = False 

147 

148 if table.alias is not None: 

149 # access by alias if table is having alias 

150 keys = [to_string(table.alias)] 

151 

152 else: 

153 # access by table name, in all variants 

154 keys = [] 

155 parts = [] 

156 # in reverse order 

157 for p in table.parts[::-1]: 

158 parts.insert(0, p) 

159 keys.append(".".join(parts)) 

160 

161 # remember table 

162 tbl = Table(ds=ds, node=table, is_predictor=is_predictor) 

163 tbl.keys = keys 

164 

165 return tbl 

166 

167 def prepare_select(self, query): 

168 # prepare select with or without predictor 

169 

170 stmt = self.planner.statement 

171 

172 # get all predictors 

173 query_predictors = [] 

174 

175 def find_predictors(node, is_table, **kwargs): 

176 if is_table and isinstance(node, ast.Identifier): 

177 if self.planner.is_predictor(node): 

178 query_predictors.append(node) 

179 

180 utils.query_traversal(query, find_predictors) 

181 

182 # === get all tables from 1st level of query === 

183 stmt.tables_map = {} 

184 stmt.tables_lvl1 = [] 

185 if query.from_table is not None: 185 ↛ 215line 185 didn't jump to line 215 because the condition on line 185 was always true

186 if isinstance(query.from_table, ast.Join): 

187 # get all tables 

188 join_tables = utils.convert_join_to_list(query.from_table) 

189 else: 

190 join_tables = [dict(table=query.from_table)] 

191 

192 if isinstance(query.from_table, ast.Select): 

193 # nested select, get only last select 

194 join_tables = [dict(table=utils.get_deepest_select(query.from_table).from_table)] 

195 

196 for i, join_table in enumerate(join_tables): 

197 table = join_table["table"] 

198 if isinstance(table, ast.Identifier): 

199 tbl = self.table_from_identifier(table) 

200 

201 if tbl.is_predictor: 

202 # Is the last table? 

203 if i + 1 < len(join_tables): 

204 raise PlanningException("Predictor must be last table in query") 

205 

206 stmt.tables_lvl1.append(tbl) 

207 for key in tbl.keys: 

208 stmt.tables_map[key] = tbl 

209 

210 else: 

211 # don't add unknown table to looking list 

212 continue 

213 

214 # is there any predictors at other levels? 

215 lvl1_predictors = [i for i in stmt.tables_lvl1 if i.is_predictor] 

216 if len(query_predictors) != len(lvl1_predictors): 

217 raise PlanningException("Predictor is not at first level") 

218 

219 # === get targets === 

220 columns = [] 

221 get_all_tables = False 

222 for t in query.targets: 

223 column = Column(t) 

224 

225 # column alias 

226 alias = None 

227 if t.alias is not None: 

228 alias = to_string(t.alias) 

229 

230 if isinstance(t, ast.Star): 

231 if len(stmt.tables_lvl1) == 0: 231 ↛ 233line 231 didn't jump to line 233 because the condition on line 231 was never true

232 # if "from" is emtpy we can't make plan 

233 raise PlanningException("Can't find table") 

234 

235 column.is_star = True 

236 get_all_tables = True 

237 

238 elif isinstance(t, ast.Identifier): 

239 if alias is None: 

240 alias = t.parts[-1] 

241 

242 table = self.get_table_of_column(t) 

243 if table is None: 

244 # table is not known 

245 get_all_tables = True 

246 else: 

247 column.table = table 

248 

249 elif isinstance(t, ast.Constant): 

250 if alias is None: 

251 alias = str(t.value) 

252 column.type = self.get_type_of_var(t.value) 

253 elif isinstance(t, ast.Function): 

254 # mysql function 

255 if t.op == "connection_id": 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true

256 column.type = "integer" 

257 else: 

258 column.type = "str" 

259 else: 

260 # TODO go down into lower level. 

261 # It can be function, operation, select. 

262 # But now show it as string 

263 

264 # TODO add several known types for function, i.e ABS-int 

265 

266 # TODO TypeCast - as casted type 

267 column.type = "str" 

268 

269 if alias is not None: 

270 column.alias = alias 

271 columns.append(column) 

272 

273 # === get columns from tables === 

274 request_tables = set() 

275 for column in columns: 

276 if column.table is not None: 

277 request_tables.add(column.table.name) 

278 

279 for table in stmt.tables_lvl1: 

280 if get_all_tables or table.name in request_tables: 

281 if table.is_predictor: 

282 step = steps.GetPredictorColumns(namespace=table.ds, predictor=table.node) 

283 else: 

284 step = steps.GetTableColumns(namespace=table.ds, table=table.name) 

285 yield step 

286 

287 if step.result_data is not None: 287 ↛ 289line 287 didn't jump to line 289 because the condition on line 287 was never true

288 # save results 

289 table.columns = step.result_data.columns 

290 table.columns_map = {column.name.upper(): column for column in step.result_data.columns} 

291 

292 # === create columns list === 

293 columns_result = [] 

294 for i, column in enumerate(columns): 

295 if column.is_star: 

296 # add data from all tables 

297 for table in stmt.tables_lvl1: 297 ↛ 310line 297 didn't jump to line 310 because the loop on line 297 didn't complete

298 if table.columns is None: 298 ↛ 301line 298 didn't jump to line 301 because the condition on line 298 was always true

299 raise PlanningException(f"Table is not found {table.name}") 

300 

301 for col in table.columns: 

302 # col = {name: 'col', type: 'str'} 

303 column2 = Column(table=table, name=col.name) 

304 column2.alias = col.name 

305 column2.type = col.type 

306 

307 columns_result.append(column2) 

308 

309 # to next column 

310 continue 

311 

312 elif column.name is not None: 

313 # is Identifier 

314 if isinstance(column.name, ast.Star): 

315 continue 

316 col_name = column.name.upper() 

317 if column.table is not None: 

318 table = column.table 

319 if table.columns_map is not None: 319 ↛ 320line 319 didn't jump to line 320 because the condition on line 319 was never true

320 if col_name in table.columns_map: 

321 column.type = table.columns_map[col_name].type 

322 else: 

323 # continue 

324 raise PlanningException(f"Column not found {col_name}") 

325 

326 else: 

327 # table is not found, looking for in all tables 

328 for table in stmt.tables_lvl1: 

329 if table.columns_map is not None: 329 ↛ 330line 329 didn't jump to line 330 because the condition on line 329 was never true

330 col = table.columns_map.get(col_name) 

331 if col is not None: 

332 column.type = col.type 

333 column.table = table 

334 break 

335 

336 # forcing alias 

337 if column.alias is None: 

338 column.alias = f"column_{i}" 

339 

340 # forcing type 

341 if column.type is None: 

342 column.type = "str" 

343 

344 columns_result.append(column) 

345 

346 # save columns 

347 stmt.columns = columns_result 

348 

349 def prepare_insert(self, query): 

350 stmt = self.planner.statement 

351 

352 # get table columns 

353 table = self.table_from_identifier(query.table) 

354 if table.is_predictor: 

355 step = steps.GetPredictorColumns(namespace=table.ds, predictor=table.node) 

356 else: 

357 step = steps.GetTableColumns(namespace=table.ds, table=table.name) 

358 yield step 

359 

360 if step.result_data is not None: 

361 # save results 

362 

363 if len(step.result_data["tables"]) > 0: 

364 table_info = step.result_data["tables"][0] 

365 columns_info = step.result_data["columns"][table_info] 

366 

367 table.columns = [] 

368 table.ds = table_info[0] 

369 for col in columns_info: 

370 if isinstance(col, tuple): 

371 # is predictor 

372 col = dict(name=col[0], type="str") 

373 table.columns.append( 

374 Column( 

375 name=col["name"], 

376 type=col["type"], 

377 ) 

378 ) 

379 

380 # map by names 

381 table.columns_map = {i.name.upper(): i for i in table.columns} 

382 

383 # save results 

384 columns_result = [] 

385 for col in query.columns: 

386 col_name = col.parts[-1] 

387 

388 column = Column(table=table, name=col_name) 

389 

390 if table.columns_map is not None: 

391 col = table.columns_map.get(col_name) 

392 if col is not None: 

393 column.type = col.type 

394 

395 if column.type is None: 

396 # forcing type 

397 column.type = "str" 

398 

399 columns_result.append(column) 

400 

401 stmt.columns = columns_result 

402 

403 def prepare_show(self, query): 

404 stmt = self.planner.statement 

405 

406 stmt.columns = [ 

407 Column(name="Variable_name", type="str"), 

408 Column(name="Value", type="str"), 

409 ] 

410 return [] 

411 

412 def prepare_steps(self, query): 

413 stmt = Statement() 

414 self.planner.statement = stmt 

415 

416 self.planner.query = query 

417 

418 query = copy.deepcopy(query) 

419 

420 params = utils.get_query_params(query) 

421 

422 stmt.params = params 

423 

424 # get columns 

425 if isinstance(query, ast.Select): 

426 # prepare select 

427 return self.prepare_select(query) 

428 if isinstance(query, ast.Union): 

429 # get column definition only from select 

430 return self.prepare_select(query.left) 

431 if isinstance(query, ast.Insert): 431 ↛ 434line 431 didn't jump to line 434 because the condition on line 431 was never true

432 # return self.prepare_insert(query) 

433 # TODO do we need columns? 

434 return [] 

435 if isinstance(query, ast.Delete): 

436 ... 

437 # TODO do we need columns? 

438 return [] 

439 if isinstance(query, ast.Show): 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true

440 return self.prepare_show(query) 

441 else: 

442 # do nothing 

443 return [] 

444 # raise NotImplementedError(query.__name__) 

445 

446 def execute_steps(self, params=None): 

447 # find all parameters 

448 stmt = self.planner.statement 

449 

450 # is already executed 

451 if stmt is None: 

452 if params is not None: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true

453 raise PlanningException("Can't execute statement") 

454 stmt = Statement() 

455 

456 # === form query with new target === 

457 

458 query = self.planner.query 

459 

460 if params is not None: 

461 if len(params) != len(stmt.params): 461 ↛ 462line 461 didn't jump to line 462 because the condition on line 461 was never true

462 raise PlanningException("Count of execution parameters don't match prepared statement") 

463 

464 query = utils.fill_query_params(query, params) 

465 

466 self.planner.query = query 

467 

468 # prevent from second execution 

469 stmt.params = None 

470 

471 if ( 471 ↛ 483line 471 didn't jump to line 483 because the condition on line 471 was always true

472 isinstance(query, ast.Select) 

473 or isinstance(query, ast.Union) 

474 or isinstance(query, ast.CreateTable) 

475 or isinstance(query, ast.Insert) 

476 or isinstance(query, ast.Update) 

477 or isinstance(query, ast.Delete) 

478 or isinstance(query, ast.Intersect) 

479 or isinstance(query, ast.Except) 

480 ): 

481 return self.plan_query(query) 

482 else: 

483 return [] 

484 

485 def plan_query(self, query): 

486 # use v1 planner 

487 self.planner.from_query(query) 

488 step = None 

489 for step in self.planner.plan.steps: 

490 yield step