Coverage for mindsdb / api / executor / planner / utils.py: 85%

70 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import copy 

2from typing import List 

3 

4from mindsdb_sql_parser.ast import Identifier, Select, BinaryOperation, Constant, Parameter 

5from mindsdb_sql_parser import ast 

6 

7from mindsdb.integrations.utilities.query_traversal import query_traversal 

8from mindsdb.api.executor.planner.exceptions import PlanningException 

9 

10 

11def get_predictor_name_identifier(identifier): 

12 new_identifier = copy.deepcopy(identifier) 

13 if len(new_identifier.parts) > 1: 13 ↛ 15line 13 didn't jump to line 15 because the condition on line 13 was always true

14 new_identifier.parts.pop(0) 

15 return new_identifier 

16 

17 

18def disambiguate_predictor_column_identifier(identifier, predictor): 

19 """Removes integration name from column if it's present, adds table path if it's absent""" 

20 table_ref = predictor.alias.parts_to_str() if predictor.alias else predictor.parts_to_str() 

21 parts = list(identifier.parts) 

22 if parts[0] == table_ref: 

23 parts = parts[1:] 

24 

25 new_identifier = Identifier(parts=parts) 

26 return new_identifier 

27 

28 

29def recursively_extract_column_values(op, row_dict, predictor): 

30 if isinstance(op, BinaryOperation) and op.op == '=': 

31 id = op.args[0] 

32 value = op.args[1] 

33 

34 if not ( 34 ↛ 38line 34 didn't jump to line 38 because the condition on line 34 was never true

35 isinstance(id, Identifier) 

36 and (isinstance(value, Constant) or isinstance(value, Parameter)) 

37 ): 

38 raise PlanningException(f'The WHERE clause for selecting from a predictor' 

39 f' must contain pairs \'Identifier(...) = Constant(...)\',' 

40 f' found instead: {id.to_tree()}, {value.to_tree()}') 

41 

42 id = disambiguate_predictor_column_identifier(id, predictor) 

43 

44 if str(id) in row_dict: 

45 raise PlanningException(f'Multiple values provided for {str(id)}') 

46 if isinstance(value, Constant): 

47 value = value.value 

48 row_dict[str(id)] = value 

49 elif isinstance(op, BinaryOperation) and op.op == 'and': 

50 recursively_extract_column_values(op.args[0], row_dict, predictor) 

51 recursively_extract_column_values(op.args[1], row_dict, predictor) 

52 else: 

53 raise PlanningException(f'Only \'and\' and \'=\' operations allowed in WHERE clause, found: {op.to_tree()}') 

54 

55 

56def get_deepest_select(select): 

57 if not select.from_table or not isinstance(select.from_table, Select): 57 ↛ 59line 57 didn't jump to line 59 because the condition on line 57 was always true

58 return select 

59 return get_deepest_select(select.from_table) 

60 

61 

62def convert_join_to_list(join): 

63 # join tree to table list 

64 

65 if isinstance(join.right, ast.Join): 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true

66 raise NotImplementedError('Wrong join AST') 

67 

68 items = [] 

69 

70 if isinstance(join.left, ast.Join): 

71 # dive to next level 

72 items.extend(convert_join_to_list(join.left)) 

73 else: 

74 # this is first table 

75 items.append(dict( 

76 table=join.left 

77 )) 

78 

79 # all properties set to right table 

80 items.append(dict( 

81 table=join.right, 

82 join_type=join.join_type, 

83 is_implicit=join.implicit, 

84 condition=join.condition 

85 )) 

86 

87 return items 

88 

89 

90def get_query_params(query): 

91 # find all parameters 

92 params = [] 

93 

94 def params_find(node, **kwargs): 

95 if isinstance(node, ast.Parameter): 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true

96 params.append(node) 

97 return node 

98 

99 query_traversal(query, params_find) 

100 return params 

101 

102 

103def fill_query_params(query, params): 

104 

105 params = copy.deepcopy(params) 

106 

107 def params_replace(node, **kwargs): 

108 if isinstance(node, ast.Parameter): 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true

109 value = params.pop(0) 

110 return ast.Constant(value) 

111 

112 # put parameters into query 

113 query_traversal(query, params_replace) 

114 

115 return query 

116 

117 

118def filters_to_bin_op(filters: List[BinaryOperation]): 

119 # make a new where clause without params 

120 where = None 

121 for flt in filters: 

122 if where is None: 122 ↛ 125line 122 didn't jump to line 125 because the condition on line 122 was always true

123 where = flt 

124 else: 

125 where = BinaryOperation(op='and', args=[where, flt]) 

126 return where