Coverage for mindsdb / interfaces / knowledge_base / executor.py: 52%
190 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from dataclasses import dataclass
2import copy
3from typing import List, Optional, Union
5from mindsdb_sql_parser.ast import (
6 BinaryOperation,
7 Identifier,
8 Constant,
9 UnaryOperation,
10 Select,
11 Star,
12 Tuple,
13 ASTNode,
14 BetweenOperation,
15 NullConstant,
16)
17import pandas as pd
19from mindsdb.integrations.utilities.query_traversal import query_traversal
22@dataclass
23class ConditionBlock:
24 op: str
25 items: list
28class KnowledgeBaseQueryExecutor:
29 def __init__(self, kb, content_column="content", id_column="chunk_id"):
30 self.kb = kb
31 self.content_column = content_column.lower()
32 self.id_column = id_column
33 self.limit = None
34 self._negative_set_size = 100
35 self._negative_set_threshold = 0.5
37 def is_content_condition(self, node: ASTNode) -> bool:
38 """
39 Checks if the node is a condition to Content column
41 :param node: condition to check
42 """
43 if isinstance(node, BinaryOperation): 43 ↛ 59line 43 didn't jump to line 59 because the condition on line 43 was always true
44 if isinstance(node.args[0], Identifier): 44 ↛ 59line 44 didn't jump to line 59 because the condition on line 44 was always true
45 parts = node.args[0].parts
47 # map chunk_content to content
48 if parts[0].lower() == "chunk_content": 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true
49 parts[0] = self.content_column
51 if len(parts) == 1 and parts[0].lower() == self.content_column:
52 if "LIKE" in node.op.upper(): 52 ↛ 54line 52 didn't jump to line 54 because the condition on line 52 was never true
53 # remove '%'
54 arg = node.args[1]
55 if isinstance(arg, Constant) and isinstance(arg.value, str):
56 arg.value = arg.value.strip(" %")
58 return True
59 return False
61 @staticmethod
62 def invert_content_op(node: BinaryOperation) -> BinaryOperation:
63 # Change operator of binary operation to opposite one
64 op_map = {"=": "!=", "!=": "=", "LIKE": "!=", "NOT LIKE": "=", "IN": "NOT IN", "NOT IN": "IN"}
65 if node.op.upper() not in op_map:
66 raise NotImplementedError(f"Can't handle condition: '{str(node)}'")
67 node.op = op_map[node.op.upper()]
68 return node
70 def convert_unary_ops(self, node: ASTNode, callstack: List[ASTNode], **kwargs) -> ASTNode:
71 """
72 Tries to remove unary operator and apply it to Binary operation.
73 Supported cases:
74 - "NOT content <op> value" => "content <!op> value"
75 - "content <op> NOT value" => "content <!op> value"
77 Where <!op> is inverted operator of <op>
78 """
80 if isinstance(node, UnaryOperation): 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true
81 if node.op.upper() == "NOT":
82 # two options:
83 # 1. NOT content <op> value
84 if self.is_content_condition(node.args[0]):
85 self.invert_content_op(node.args[0])
86 return node.args[0]
88 # 2. content <op> NOT value
89 if self.is_content_condition(callstack[0]):
90 self.invert_content_op(callstack[0])
91 return node.args[0]
93 def union(self, results: List[pd.DataFrame]) -> pd.DataFrame:
94 # combine dataframes from input list to single one
96 if len(results) == 1:
97 return results[0]
99 res = pd.concat(results)
100 df = res.drop_duplicates(subset=[self.id_column]).reset_index()
101 return df
103 def intersect(self, results: List[pd.DataFrame]) -> pd.DataFrame:
104 # intersect dataframes from input list: return dataframe with rows that exist in all input dataframes
106 if len(results) == 1: 106 ↛ 109line 106 didn't jump to line 109 because the condition on line 106 was always true
107 return results[0]
109 item = results[0]
110 for item2 in results[1:]:
111 item = item[item[self.id_column].isin(item2[self.id_column])]
113 df = item
114 return df
116 @classmethod
117 def flatten_conditions(cls, node: ASTNode) -> Union[ASTNode, ConditionBlock]:
118 """
119 Recursively inspect conditions tree and move conditions related to 'OR' or 'AND' operators of the same level
120 to same ConditionBlock
121 Example: or (a=1, or (b=2, c=3))
122 is converted to: ConditionBlock(or, [a=1, b=2, c=3])
123 """
125 if isinstance(node, BinaryOperation): 125 ↛ 144line 125 didn't jump to line 144 because the condition on line 125 was always true
126 op = node.op.upper()
127 if op in ("AND", "OR"):
128 block = ConditionBlock(op, [])
129 for arg in node.args:
130 item = cls.flatten_conditions(arg)
131 if isinstance(item, ConditionBlock): 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true
132 if item.op == block.op:
133 block.items.extend(item.items)
134 else:
135 # new type of block
136 block.items.append(item)
137 else:
138 block.items.append(item)
139 return block
140 else:
141 node.op = node.op.upper()
142 return node
144 elif isinstance(node, BetweenOperation):
145 block = ConditionBlock(
146 "AND",
147 [
148 BinaryOperation(">=", args=[node.args[0], node.args[1]]),
149 BinaryOperation("<=", args=[node.args[0], node.args[2]]),
150 ],
151 )
152 return block
154 raise NotImplementedError(f"Unknown node '{node}'")
156 def call_kb(
157 self, conditions: List[BinaryOperation], disable_reranking: bool = False, limit: int = None
158 ) -> pd.DataFrame:
159 """
160 Call KB with list of prepared conditions
162 :param conditions: input conditions
163 :param disable_reranking: flag disable reranking
164 :param limit: use custom limit
165 :return: result of querying KB
166 """
168 where = None
169 for condition in conditions:
170 arg0 = condition.args[0]
172 # is it json operator on metadata
173 if isinstance(arg0, BinaryOperation) and arg0.op in ("->", "->>"): 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true
174 op_arg0, op_arg1 = arg0.args
175 if (
176 isinstance(op_arg0, Identifier)
177 and isinstance(op_arg1, Constant)
178 and op_arg0.parts[-1].lower() == "metadata"
179 ):
180 # replace to metadata column
181 condition.args[0] = Identifier(parts=[op_arg1.value])
183 if where is None:
184 where = condition
185 else:
186 where = BinaryOperation("AND", args=[where, condition])
188 query = Select(targets=[Star()], where=where)
190 if limit is not None: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true
191 query.limit = Constant(limit)
192 elif self.limit is not None: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 query.limit = Constant(self.limit)
195 return self.kb.select(query, disable_reranking=disable_reranking)
197 def execute_content_condition(
198 self,
199 content_condition: BinaryOperation,
200 other_conditions: List[BinaryOperation] = None,
201 disable_reranking: bool = False,
202 limit: int = None,
203 ) -> pd.DataFrame:
204 """
205 Call KB using content condition. Only positive conditions for content can be here.
206 Negative conditions can be only as filter of ID
207 :param content_condition: condition for Content column
208 :param other_conditions: conditions for other columns
209 :param disable_reranking: turn off reranking
210 :param limit: override default limit
211 :return: result of the query
212 """
214 if other_conditions is None: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true
215 other_conditions = []
217 if content_condition.op == "IN": 217 ↛ 219line 217 didn't jump to line 219 because the condition on line 217 was never true
218 # (select where content = ‘a’) UNION (select where content = ‘b’)
219 results = []
220 for el in content_condition.args[1].items:
221 el_cond = BinaryOperation(op="=", args=[Identifier(self.content_column), el])
222 results.append(
223 self.call_kb([el_cond] + other_conditions, disable_reranking=disable_reranking, limit=limit)
224 )
225 return self.union(results)
227 elif content_condition.op in ("=", "LIKE"): 227 ↛ 233line 227 didn't jump to line 233 because the condition on line 227 was always true
228 # just '='
229 content_condition2 = copy.deepcopy(content_condition)
230 content_condition2.op = "="
231 return self.call_kb([content_condition2] + other_conditions)
233 elif content_condition.op == "IS" and isinstance(content_condition.args[1], NullConstant):
234 # return empty dataset, call to get column names
235 return self.call_kb([], limit=1)[:0]
236 elif content_condition.op == "IS NOT" and isinstance(content_condition.args[1], NullConstant):
237 # execute without conditions
238 return self.call_kb([])
239 else:
240 raise NotImplementedError(
241 f'Operator "{content_condition.op}" is not supported for condition: {content_condition}'
242 )
244 @staticmethod
245 def to_include_content(content_condition: BinaryOperation) -> List[str]:
246 """
247 Handles positive conditions for content. Returns list of content values
248 """
249 if content_condition.op == "IN": 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true
250 return [item.value for item in content_condition.args[1].items]
252 elif content_condition.op in ("=", "LIKE"): 252 ↛ exitline 252 didn't return from function 'to_include_content' because the condition on line 252 was always true
253 return [content_condition.args[1].value]
255 def to_excluded_ids(
256 self, content_condition: BinaryOperation, other_conditions: List[BinaryOperation]
257 ) -> Optional[List[str]]:
258 """
259 Handles negative conditions for content. If it is negative condition: extract and return list of IDs
260 that have to be excluded by parent query
262 :param content_condition: condition for Content column
263 :param other_conditions: conditions for other columns
264 :return: list of IDs to exclude or None
265 """
267 if content_condition.op in ("!=", "<>", "NOT LIKE"): 267 ↛ 271line 267 didn't jump to line 271 because the condition on line 267 was never true
268 # id NOT IN (
269 # SELECT id FROM kb WHERE content =’...’ limit X
270 # )
271 el_cond = BinaryOperation(op="=", args=content_condition.args)
272 threshold = BinaryOperation(op=">=", args=[Identifier("relevance"), Constant(self._negative_set_threshold)])
273 res = self.call_kb(
274 [el_cond, threshold] + other_conditions, disable_reranking=True, limit=self._negative_set_size
275 )
277 return list(res[self.id_column])
279 elif content_condition.op == "NOT IN": 279 ↛ 283line 279 didn't jump to line 283 because the condition on line 279 was never true
280 # id NOT IN (
281 # select id where content in (‘a’, ‘b’)
282 # )
283 content_condition2 = copy.deepcopy(content_condition)
284 content_condition2.op = "IN"
286 threshold = BinaryOperation(op=">=", args=[Identifier("relevance"), Constant(self._negative_set_threshold)])
287 res = self.execute_content_condition(
288 content_condition2,
289 other_conditions + [threshold],
290 disable_reranking=True,
291 limit=self._negative_set_size,
292 )
294 return list(res[self.id_column])
295 else:
296 return None
298 def execute_blocks(self, block: ConditionBlock) -> pd.DataFrame:
299 """
300 Split block to set of calls with conditions and execute them. Nested blocks are supported
302 :param block:
303 :return: dataframe with result of block execution
304 """
306 if not isinstance(block, ConditionBlock): 306 ↛ 308line 306 didn't jump to line 308 because the condition on line 306 was never true
307 # single condition
308 if self.is_content_condition(block):
309 return self.execute_content_condition(block)
310 else:
311 return self.call_kb([block])
313 if block.op == "AND": 313 ↛ 364line 313 didn't jump to line 364 because the condition on line 313 was always true
314 results = []
316 content_filters, other_filters = [], []
317 for item in block.items:
318 if isinstance(item, ConditionBlock): 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 results.append(self.execute_blocks(item))
320 else:
321 if self.is_content_condition(item):
322 content_filters.append(item)
323 else:
324 other_filters.append(item)
325 if len(content_filters) > 0: 325 ↛ 359line 325 didn't jump to line 359 because the condition on line 325 was always true
326 content_filters2 = []
327 exclude_ids = set()
328 include_contents = set()
329 # exclude content conditions
330 for condition in content_filters:
331 ids = self.to_excluded_ids(condition, other_filters)
332 if ids is not None: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true
333 exclude_ids.update(ids)
334 continue
335 contents = self.to_include_content(condition)
336 if contents is not None: 336 ↛ 341line 336 didn't jump to line 341 because the condition on line 336 was always true
337 include_contents.update(contents)
338 continue
339 else:
340 # keep origin content filter
341 content_filters2.append(condition)
343 if exclude_ids: 343 ↛ 345line 343 didn't jump to line 345 because the condition on line 343 was never true
344 # add to filter
345 values = [Constant(i) for i in exclude_ids]
346 condition = BinaryOperation(op="NOT IN", args=[Identifier(self.id_column), Tuple(values)])
347 other_filters.append(condition)
348 # execute content filters
349 if include_contents: 349 ↛ 356line 349 didn't jump to line 356 because the condition on line 349 was always true
350 content = " AND ".join(include_contents)
351 result = self.execute_content_condition(
352 BinaryOperation(op="=", args=[Identifier(self.content_column), Constant(content)]),
353 other_filters,
354 )
355 results.append(result)
356 for condition in content_filters2: 356 ↛ 357line 356 didn't jump to line 357 because the loop on line 356 never started
357 result = self.execute_content_condition(condition, other_filters)
358 results.append(result)
359 elif len(other_filters) > 0:
360 results.append(self.call_kb(other_filters))
362 return self.intersect(results)
364 elif block.op == "OR":
365 results = []
366 for item in block.items:
367 results.append(self.execute_blocks(item))
369 return self.union(results)
371 def run(self, query: Select) -> pd.DataFrame:
372 """
373 Plan and execute query to KB. If query has complex conditions:
374 - convert them to several queries with simple conditions, execute them and combine results
376 Stages:
377 - Remove unary NOT from condition: try to apply it to related operator
378 - Flat conditions tree: convert into condition blocks:
379 - having with same operators of the same levels in the same block
380 - Recursively execute blocks
381 - get data from OR blocks and union them
382 - get data from AND blocks and intersect them
384 :param query: select query
385 :return: results
386 """
387 if query.where is not None: 387 ↛ 394line 387 didn't jump to line 394 because the condition on line 387 was always true
388 query_traversal(query.where, self.convert_unary_ops)
389 blocks_tree = self.flatten_conditions(query.where)
390 if query.limit is not None: 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true
391 self.limit = query.limit.value
392 return self.execute_blocks(blocks_tree)
393 else:
394 return self.kb.select(query)