Coverage for mindsdb / interfaces / query_context / last_query.py: 26%
136 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Union, List
2import copy
3from collections import defaultdict
5from mindsdb_sql_parser.ast import (
6 Identifier, Select, BinaryOperation, Last, Constant, Star, ASTNode, NullConstant, OrderBy, Function, TypeCast
7)
8from mindsdb.integrations.utilities.query_traversal import query_traversal
11class LastQuery:
12 """
13 Wrapper for AST query.
14 Intended to ind, track, update last values in query
15 """
17 def __init__(self, query: ASTNode):
18 self.query_orig = None
19 self.query = None
21 # check query type
22 if not isinstance(query, Select): 22 ↛ 24line 22 didn't jump to line 24 because the condition on line 22 was never true
23 # just skip it
24 return
26 self.last_idx = defaultdict(list)
27 last_tables = self._find_last_columns(query)
28 if last_tables is None: 28 ↛ 31line 28 didn't jump to line 31 because the condition on line 28 was always true
29 return
31 self.query = query
33 self.last_tables = last_tables
35 def _find_last_columns(self, query: ASTNode) -> Union[dict, None]:
36 """
37 This function:
38 - Searches LAST column in the input query
39 - Replaces it with constants and memorises link to these constants
40 - Link to constants will be used to inject values to query instead of LAST
41 - Provide checks:
42 - if it is possible to find the table for column
43 - if column in select target
44 - Generates and returns last_column variable which is dict
45 last_columns[table_name] = {
46 'table': <table identifier>,
47 'column': <column name>,
48 'links': [<link to ast node>, ... ],
49 'target_idx': <number of column in select target>,
50 'gen_init_query': if true: to generate query to initial values for LAST
51 }
52 """
54 # index last variables in query
55 tables_idx = defaultdict(dict)
56 conditions = []
58 def replace_last_in_tree(node: ASTNode, injected: Constant):
59 """
60 Recursively searches LAST in AST tree. Goes only into functions and type casts
61 When LAST is found - it is replaced with injected constant
62 """
63 # go into functions and type casts
64 if isinstance(node, TypeCast): 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true
65 if isinstance(node.arg, Last):
66 node.arg = injected
67 return injected
68 return replace_last_in_tree(node.arg, injected)
69 if isinstance(node, Function): 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true
70 for i, arg in enumerate(node.args):
71 if isinstance(arg, Last):
72 node.args[i] = injected
73 return injected
74 found = replace_last_in_tree(arg, injected)
75 if found:
76 return found
78 def index_query(node, is_table, parent_query, **kwargs):
80 parent_query_id = id(parent_query)
81 last = None
82 if is_table and isinstance(node, Identifier):
83 # memorize table
84 tables_idx[parent_query_id][node.parts[-1]] = node
85 if node.alias is not None: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 tables_idx[parent_query_id][node.alias.parts[-1]] = node
88 # find last in where
89 if isinstance(node, BinaryOperation):
90 if isinstance(node.args[0], Identifier): 90 ↛ 106line 90 didn't jump to line 106 because the condition on line 90 was always true
91 col = node.args[0]
92 gen_init_query = True
94 # col > last
95 if isinstance(node.args[1], Last): 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true
96 last = Constant(None)
97 # inject constant
98 node.args[1] = last
100 # col > coalesce(last, 0) OR col > cast(coalense(last ...))
101 else:
102 injected = Constant(None)
103 last = replace_last_in_tree(node.args[1], injected)
104 gen_init_query = False
106 if last is not None: 106 ↛ 108line 106 didn't jump to line 108 because the condition on line 106 was never true
107 # memorize
108 conditions.append({
109 'query_id': parent_query_id,
110 'condition': node,
111 'last': last,
112 'column': col,
113 'gen_init_query': gen_init_query # generate query to fetch initial last values from table
114 })
116 # find lasts
117 query_traversal(query, index_query)
119 if len(conditions) == 0: 119 ↛ 122line 119 didn't jump to line 122 because the condition on line 119 was always true
120 return
122 self.query_orig = copy.deepcopy(query)
124 for info in conditions:
125 self.last_idx[info['query_id']].append(info)
127 # index query targets
128 query_id = id(query)
129 tables = tables_idx[query_id]
130 is_star_in_target = False
131 target_idx = {}
132 for i, target in enumerate(query.targets):
133 if isinstance(target, Star):
134 is_star_in_target = True
135 continue
136 elif not isinstance(target, Identifier):
137 continue
139 col_name = target.parts[-1]
140 if len(target.parts) > 1:
141 table_name = target.parts[-2]
142 table = tables.get(table_name)
143 elif len(tables) == 1:
144 table = list(tables.values())[0]
145 else:
146 continue
148 target_idx[(table.parts[-1], col_name)] = i
150 # make info about query
152 last_columns = {}
153 for parent_query_id, items in self.last_idx.items():
154 for info in items:
155 col = info['column']
156 last = info['last']
157 tables = tables_idx[parent_query_id]
159 uniq_tables = len(set([id(v) for v in tables.values()]))
160 if len(col.parts) > 1:
162 table = tables.get(col.parts[-2])
163 if table is None:
164 raise ValueError('cant find table')
165 elif uniq_tables == 1:
166 table = list(tables.values())[0]
167 else:
168 # or just skip it?
169 raise ValueError('cant find table')
171 col_name = col.parts[-1]
173 table_name = table.parts[-1]
174 if table_name not in last_columns:
175 # check column in target
176 target_idx = target_idx.get((table_name, col_name))
177 if target_idx is None:
178 if is_star_in_target:
179 # will try to get by name
180 ...
181 else:
182 raise ValueError('Last value should be in query target')
184 last_columns[table_name] = {
185 'table': table,
186 'column': col_name,
187 'links': [last],
188 'target_idx': target_idx,
189 'gen_init_query': info['gen_init_query']
190 }
192 elif last_columns[table_name]['column'] == col_name:
193 last_columns[table_name]['column'].append(last)
194 else:
195 raise ValueError('possible to use only one column')
197 return last_columns
199 def to_string(self) -> str:
200 """
201 String representation of the query
202 Used to identify query in query_context table
203 """
204 return self.query_orig.to_string()
206 def get_last_columns(self) -> List[dict]:
207 """
208 Return information about LAST columns in query
209 :return:
210 """
211 return [
212 {
213 'table': info['table'],
214 'table_name': table_name,
215 'column_name': info['column'],
216 'target_idx': info['target_idx'],
217 'gen_init_query': info['gen_init_query'],
218 }
219 for table_name, info in self.last_tables.items()
220 ]
222 def apply_values(self, values: dict) -> ASTNode:
223 """
224 Fills query with new values and return it
225 """
226 for table_name, info in self.last_tables.items():
227 value = values.get(table_name, {}).get(info['column'])
228 for last in info['links']:
229 last.value = value
231 return self.query
233 def get_init_queries(self):
234 """
235 A generator of queries to get initial value of the last
236 """
238 back_up_values = []
239 # replace values
240 for items in self.last_idx.values():
241 for info in items:
242 node = info['condition']
243 back_up_values.append([node.op, node.args[1]])
244 node.op = 'is not'
245 node.args[1] = NullConstant()
247 query2 = copy.deepcopy(self.query)
249 # return values
250 for items in self.last_idx.values():
251 for info in items:
252 node = info['condition']
253 op, arg1 = back_up_values.pop(0)
254 node.op = op
255 node.args[1] = arg1
257 for info in self.get_last_columns():
258 if not info['gen_init_query']:
259 continue
260 col = Identifier(info['column_name'])
261 query2.targets = [col]
262 query2.order_by = [
263 OrderBy(col, direction='DESC')
264 ]
265 query2.limit = Constant(1)
266 yield query2, info