Coverage for mindsdb / interfaces / query_context / last_query.py: 26%

136 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Union, List 

2import copy 

3from collections import defaultdict 

4 

5from mindsdb_sql_parser.ast import ( 

6 Identifier, Select, BinaryOperation, Last, Constant, Star, ASTNode, NullConstant, OrderBy, Function, TypeCast 

7) 

8from mindsdb.integrations.utilities.query_traversal import query_traversal 

9 

10 

11class LastQuery: 

12 """ 

13 Wrapper for AST query. 

14 Intended to ind, track, update last values in query 

15 """ 

16 

17 def __init__(self, query: ASTNode): 

18 self.query_orig = None 

19 self.query = None 

20 

21 # check query type 

22 if not isinstance(query, Select): 22 ↛ 24line 22 didn't jump to line 24 because the condition on line 22 was never true

23 # just skip it 

24 return 

25 

26 self.last_idx = defaultdict(list) 

27 last_tables = self._find_last_columns(query) 

28 if last_tables is None: 28 ↛ 31line 28 didn't jump to line 31 because the condition on line 28 was always true

29 return 

30 

31 self.query = query 

32 

33 self.last_tables = last_tables 

34 

35 def _find_last_columns(self, query: ASTNode) -> Union[dict, None]: 

36 """ 

37 This function: 

38 - Searches LAST column in the input query 

39 - Replaces it with constants and memorises link to these constants 

40 - Link to constants will be used to inject values to query instead of LAST 

41 - Provide checks: 

42 - if it is possible to find the table for column 

43 - if column in select target 

44 - Generates and returns last_column variable which is dict 

45 last_columns[table_name] = { 

46 'table': <table identifier>, 

47 'column': <column name>, 

48 'links': [<link to ast node>, ... ], 

49 'target_idx': <number of column in select target>, 

50 'gen_init_query': if true: to generate query to initial values for LAST 

51 } 

52 """ 

53 

54 # index last variables in query 

55 tables_idx = defaultdict(dict) 

56 conditions = [] 

57 

58 def replace_last_in_tree(node: ASTNode, injected: Constant): 

59 """ 

60 Recursively searches LAST in AST tree. Goes only into functions and type casts 

61 When LAST is found - it is replaced with injected constant 

62 """ 

63 # go into functions and type casts 

64 if isinstance(node, TypeCast): 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true

65 if isinstance(node.arg, Last): 

66 node.arg = injected 

67 return injected 

68 return replace_last_in_tree(node.arg, injected) 

69 if isinstance(node, Function): 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true

70 for i, arg in enumerate(node.args): 

71 if isinstance(arg, Last): 

72 node.args[i] = injected 

73 return injected 

74 found = replace_last_in_tree(arg, injected) 

75 if found: 

76 return found 

77 

78 def index_query(node, is_table, parent_query, **kwargs): 

79 

80 parent_query_id = id(parent_query) 

81 last = None 

82 if is_table and isinstance(node, Identifier): 

83 # memorize table 

84 tables_idx[parent_query_id][node.parts[-1]] = node 

85 if node.alias is not None: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 tables_idx[parent_query_id][node.alias.parts[-1]] = node 

87 

88 # find last in where 

89 if isinstance(node, BinaryOperation): 

90 if isinstance(node.args[0], Identifier): 90 ↛ 106line 90 didn't jump to line 106 because the condition on line 90 was always true

91 col = node.args[0] 

92 gen_init_query = True 

93 

94 # col > last 

95 if isinstance(node.args[1], Last): 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true

96 last = Constant(None) 

97 # inject constant 

98 node.args[1] = last 

99 

100 # col > coalesce(last, 0) OR col > cast(coalense(last ...)) 

101 else: 

102 injected = Constant(None) 

103 last = replace_last_in_tree(node.args[1], injected) 

104 gen_init_query = False 

105 

106 if last is not None: 106 ↛ 108line 106 didn't jump to line 108 because the condition on line 106 was never true

107 # memorize 

108 conditions.append({ 

109 'query_id': parent_query_id, 

110 'condition': node, 

111 'last': last, 

112 'column': col, 

113 'gen_init_query': gen_init_query # generate query to fetch initial last values from table 

114 }) 

115 

116 # find lasts 

117 query_traversal(query, index_query) 

118 

119 if len(conditions) == 0: 119 ↛ 122line 119 didn't jump to line 122 because the condition on line 119 was always true

120 return 

121 

122 self.query_orig = copy.deepcopy(query) 

123 

124 for info in conditions: 

125 self.last_idx[info['query_id']].append(info) 

126 

127 # index query targets 

128 query_id = id(query) 

129 tables = tables_idx[query_id] 

130 is_star_in_target = False 

131 target_idx = {} 

132 for i, target in enumerate(query.targets): 

133 if isinstance(target, Star): 

134 is_star_in_target = True 

135 continue 

136 elif not isinstance(target, Identifier): 

137 continue 

138 

139 col_name = target.parts[-1] 

140 if len(target.parts) > 1: 

141 table_name = target.parts[-2] 

142 table = tables.get(table_name) 

143 elif len(tables) == 1: 

144 table = list(tables.values())[0] 

145 else: 

146 continue 

147 

148 target_idx[(table.parts[-1], col_name)] = i 

149 

150 # make info about query 

151 

152 last_columns = {} 

153 for parent_query_id, items in self.last_idx.items(): 

154 for info in items: 

155 col = info['column'] 

156 last = info['last'] 

157 tables = tables_idx[parent_query_id] 

158 

159 uniq_tables = len(set([id(v) for v in tables.values()])) 

160 if len(col.parts) > 1: 

161 

162 table = tables.get(col.parts[-2]) 

163 if table is None: 

164 raise ValueError('cant find table') 

165 elif uniq_tables == 1: 

166 table = list(tables.values())[0] 

167 else: 

168 # or just skip it? 

169 raise ValueError('cant find table') 

170 

171 col_name = col.parts[-1] 

172 

173 table_name = table.parts[-1] 

174 if table_name not in last_columns: 

175 # check column in target 

176 target_idx = target_idx.get((table_name, col_name)) 

177 if target_idx is None: 

178 if is_star_in_target: 

179 # will try to get by name 

180 ... 

181 else: 

182 raise ValueError('Last value should be in query target') 

183 

184 last_columns[table_name] = { 

185 'table': table, 

186 'column': col_name, 

187 'links': [last], 

188 'target_idx': target_idx, 

189 'gen_init_query': info['gen_init_query'] 

190 } 

191 

192 elif last_columns[table_name]['column'] == col_name: 

193 last_columns[table_name]['column'].append(last) 

194 else: 

195 raise ValueError('possible to use only one column') 

196 

197 return last_columns 

198 

199 def to_string(self) -> str: 

200 """ 

201 String representation of the query 

202 Used to identify query in query_context table 

203 """ 

204 return self.query_orig.to_string() 

205 

206 def get_last_columns(self) -> List[dict]: 

207 """ 

208 Return information about LAST columns in query 

209 :return: 

210 """ 

211 return [ 

212 { 

213 'table': info['table'], 

214 'table_name': table_name, 

215 'column_name': info['column'], 

216 'target_idx': info['target_idx'], 

217 'gen_init_query': info['gen_init_query'], 

218 } 

219 for table_name, info in self.last_tables.items() 

220 ] 

221 

222 def apply_values(self, values: dict) -> ASTNode: 

223 """ 

224 Fills query with new values and return it 

225 """ 

226 for table_name, info in self.last_tables.items(): 

227 value = values.get(table_name, {}).get(info['column']) 

228 for last in info['links']: 

229 last.value = value 

230 

231 return self.query 

232 

233 def get_init_queries(self): 

234 """ 

235 A generator of queries to get initial value of the last 

236 """ 

237 

238 back_up_values = [] 

239 # replace values 

240 for items in self.last_idx.values(): 

241 for info in items: 

242 node = info['condition'] 

243 back_up_values.append([node.op, node.args[1]]) 

244 node.op = 'is not' 

245 node.args[1] = NullConstant() 

246 

247 query2 = copy.deepcopy(self.query) 

248 

249 # return values 

250 for items in self.last_idx.values(): 

251 for info in items: 

252 node = info['condition'] 

253 op, arg1 = back_up_values.pop(0) 

254 node.op = op 

255 node.args[1] = arg1 

256 

257 for info in self.get_last_columns(): 

258 if not info['gen_init_query']: 

259 continue 

260 col = Identifier(info['column_name']) 

261 query2.targets = [col] 

262 query2.order_by = [ 

263 OrderBy(col, direction='DESC') 

264 ] 

265 query2.limit = Constant(1) 

266 yield query2, info