Coverage for mindsdb / interfaces / knowledge_base / executor.py: 52%

190 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from dataclasses import dataclass 

2import copy 

3from typing import List, Optional, Union 

4 

5from mindsdb_sql_parser.ast import ( 

6 BinaryOperation, 

7 Identifier, 

8 Constant, 

9 UnaryOperation, 

10 Select, 

11 Star, 

12 Tuple, 

13 ASTNode, 

14 BetweenOperation, 

15 NullConstant, 

16) 

17import pandas as pd 

18 

19from mindsdb.integrations.utilities.query_traversal import query_traversal 

20 

21 

22@dataclass 

23class ConditionBlock: 

24 op: str 

25 items: list 

26 

27 

28class KnowledgeBaseQueryExecutor: 

29 def __init__(self, kb, content_column="content", id_column="chunk_id"): 

30 self.kb = kb 

31 self.content_column = content_column.lower() 

32 self.id_column = id_column 

33 self.limit = None 

34 self._negative_set_size = 100 

35 self._negative_set_threshold = 0.5 

36 

37 def is_content_condition(self, node: ASTNode) -> bool: 

38 """ 

39 Checks if the node is a condition to Content column 

40 

41 :param node: condition to check 

42 """ 

43 if isinstance(node, BinaryOperation): 43 ↛ 59line 43 didn't jump to line 59 because the condition on line 43 was always true

44 if isinstance(node.args[0], Identifier): 44 ↛ 59line 44 didn't jump to line 59 because the condition on line 44 was always true

45 parts = node.args[0].parts 

46 

47 # map chunk_content to content 

48 if parts[0].lower() == "chunk_content": 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true

49 parts[0] = self.content_column 

50 

51 if len(parts) == 1 and parts[0].lower() == self.content_column: 

52 if "LIKE" in node.op.upper(): 52 ↛ 54line 52 didn't jump to line 54 because the condition on line 52 was never true

53 # remove '%' 

54 arg = node.args[1] 

55 if isinstance(arg, Constant) and isinstance(arg.value, str): 

56 arg.value = arg.value.strip(" %") 

57 

58 return True 

59 return False 

60 

61 @staticmethod 

62 def invert_content_op(node: BinaryOperation) -> BinaryOperation: 

63 # Change operator of binary operation to opposite one 

64 op_map = {"=": "!=", "!=": "=", "LIKE": "!=", "NOT LIKE": "=", "IN": "NOT IN", "NOT IN": "IN"} 

65 if node.op.upper() not in op_map: 

66 raise NotImplementedError(f"Can't handle condition: '{str(node)}'") 

67 node.op = op_map[node.op.upper()] 

68 return node 

69 

70 def convert_unary_ops(self, node: ASTNode, callstack: List[ASTNode], **kwargs) -> ASTNode: 

71 """ 

72 Tries to remove unary operator and apply it to Binary operation. 

73 Supported cases: 

74 - "NOT content <op> value" => "content <!op> value" 

75 - "content <op> NOT value" => "content <!op> value" 

76 

77 Where <!op> is inverted operator of <op> 

78 """ 

79 

80 if isinstance(node, UnaryOperation): 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true

81 if node.op.upper() == "NOT": 

82 # two options: 

83 # 1. NOT content <op> value 

84 if self.is_content_condition(node.args[0]): 

85 self.invert_content_op(node.args[0]) 

86 return node.args[0] 

87 

88 # 2. content <op> NOT value 

89 if self.is_content_condition(callstack[0]): 

90 self.invert_content_op(callstack[0]) 

91 return node.args[0] 

92 

93 def union(self, results: List[pd.DataFrame]) -> pd.DataFrame: 

94 # combine dataframes from input list to single one 

95 

96 if len(results) == 1: 

97 return results[0] 

98 

99 res = pd.concat(results) 

100 df = res.drop_duplicates(subset=[self.id_column]).reset_index() 

101 return df 

102 

103 def intersect(self, results: List[pd.DataFrame]) -> pd.DataFrame: 

104 # intersect dataframes from input list: return dataframe with rows that exist in all input dataframes 

105 

106 if len(results) == 1: 106 ↛ 109line 106 didn't jump to line 109 because the condition on line 106 was always true

107 return results[0] 

108 

109 item = results[0] 

110 for item2 in results[1:]: 

111 item = item[item[self.id_column].isin(item2[self.id_column])] 

112 

113 df = item 

114 return df 

115 

116 @classmethod 

117 def flatten_conditions(cls, node: ASTNode) -> Union[ASTNode, ConditionBlock]: 

118 """ 

119 Recursively inspect conditions tree and move conditions related to 'OR' or 'AND' operators of the same level 

120 to same ConditionBlock 

121 Example: or (a=1, or (b=2, c=3)) 

122 is converted to: ConditionBlock(or, [a=1, b=2, c=3]) 

123 """ 

124 

125 if isinstance(node, BinaryOperation): 125 ↛ 144line 125 didn't jump to line 144 because the condition on line 125 was always true

126 op = node.op.upper() 

127 if op in ("AND", "OR"): 

128 block = ConditionBlock(op, []) 

129 for arg in node.args: 

130 item = cls.flatten_conditions(arg) 

131 if isinstance(item, ConditionBlock): 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true

132 if item.op == block.op: 

133 block.items.extend(item.items) 

134 else: 

135 # new type of block 

136 block.items.append(item) 

137 else: 

138 block.items.append(item) 

139 return block 

140 else: 

141 node.op = node.op.upper() 

142 return node 

143 

144 elif isinstance(node, BetweenOperation): 

145 block = ConditionBlock( 

146 "AND", 

147 [ 

148 BinaryOperation(">=", args=[node.args[0], node.args[1]]), 

149 BinaryOperation("<=", args=[node.args[0], node.args[2]]), 

150 ], 

151 ) 

152 return block 

153 

154 raise NotImplementedError(f"Unknown node '{node}'") 

155 

156 def call_kb( 

157 self, conditions: List[BinaryOperation], disable_reranking: bool = False, limit: int = None 

158 ) -> pd.DataFrame: 

159 """ 

160 Call KB with list of prepared conditions 

161 

162 :param conditions: input conditions 

163 :param disable_reranking: flag disable reranking 

164 :param limit: use custom limit 

165 :return: result of querying KB 

166 """ 

167 

168 where = None 

169 for condition in conditions: 

170 arg0 = condition.args[0] 

171 

172 # is it json operator on metadata 

173 if isinstance(arg0, BinaryOperation) and arg0.op in ("->", "->>"): 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true

174 op_arg0, op_arg1 = arg0.args 

175 if ( 

176 isinstance(op_arg0, Identifier) 

177 and isinstance(op_arg1, Constant) 

178 and op_arg0.parts[-1].lower() == "metadata" 

179 ): 

180 # replace to metadata column 

181 condition.args[0] = Identifier(parts=[op_arg1.value]) 

182 

183 if where is None: 

184 where = condition 

185 else: 

186 where = BinaryOperation("AND", args=[where, condition]) 

187 

188 query = Select(targets=[Star()], where=where) 

189 

190 if limit is not None: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true

191 query.limit = Constant(limit) 

192 elif self.limit is not None: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 query.limit = Constant(self.limit) 

194 

195 return self.kb.select(query, disable_reranking=disable_reranking) 

196 

197 def execute_content_condition( 

198 self, 

199 content_condition: BinaryOperation, 

200 other_conditions: List[BinaryOperation] = None, 

201 disable_reranking: bool = False, 

202 limit: int = None, 

203 ) -> pd.DataFrame: 

204 """ 

205 Call KB using content condition. Only positive conditions for content can be here. 

206 Negative conditions can be only as filter of ID 

207 :param content_condition: condition for Content column 

208 :param other_conditions: conditions for other columns 

209 :param disable_reranking: turn off reranking 

210 :param limit: override default limit 

211 :return: result of the query 

212 """ 

213 

214 if other_conditions is None: 214 ↛ 215line 214 didn't jump to line 215 because the condition on line 214 was never true

215 other_conditions = [] 

216 

217 if content_condition.op == "IN": 217 ↛ 219line 217 didn't jump to line 219 because the condition on line 217 was never true

218 # (select where content = ‘a’) UNION (select where content = ‘b’) 

219 results = [] 

220 for el in content_condition.args[1].items: 

221 el_cond = BinaryOperation(op="=", args=[Identifier(self.content_column), el]) 

222 results.append( 

223 self.call_kb([el_cond] + other_conditions, disable_reranking=disable_reranking, limit=limit) 

224 ) 

225 return self.union(results) 

226 

227 elif content_condition.op in ("=", "LIKE"): 227 ↛ 233line 227 didn't jump to line 233 because the condition on line 227 was always true

228 # just '=' 

229 content_condition2 = copy.deepcopy(content_condition) 

230 content_condition2.op = "=" 

231 return self.call_kb([content_condition2] + other_conditions) 

232 

233 elif content_condition.op == "IS" and isinstance(content_condition.args[1], NullConstant): 

234 # return empty dataset, call to get column names 

235 return self.call_kb([], limit=1)[:0] 

236 elif content_condition.op == "IS NOT" and isinstance(content_condition.args[1], NullConstant): 

237 # execute without conditions 

238 return self.call_kb([]) 

239 else: 

240 raise NotImplementedError( 

241 f'Operator "{content_condition.op}" is not supported for condition: {content_condition}' 

242 ) 

243 

244 @staticmethod 

245 def to_include_content(content_condition: BinaryOperation) -> List[str]: 

246 """ 

247 Handles positive conditions for content. Returns list of content values 

248 """ 

249 if content_condition.op == "IN": 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true

250 return [item.value for item in content_condition.args[1].items] 

251 

252 elif content_condition.op in ("=", "LIKE"): 252 ↛ exitline 252 didn't return from function 'to_include_content' because the condition on line 252 was always true

253 return [content_condition.args[1].value] 

254 

255 def to_excluded_ids( 

256 self, content_condition: BinaryOperation, other_conditions: List[BinaryOperation] 

257 ) -> Optional[List[str]]: 

258 """ 

259 Handles negative conditions for content. If it is negative condition: extract and return list of IDs 

260 that have to be excluded by parent query 

261 

262 :param content_condition: condition for Content column 

263 :param other_conditions: conditions for other columns 

264 :return: list of IDs to exclude or None 

265 """ 

266 

267 if content_condition.op in ("!=", "<>", "NOT LIKE"): 267 ↛ 271line 267 didn't jump to line 271 because the condition on line 267 was never true

268 # id NOT IN ( 

269 # SELECT id FROM kb WHERE content =’...’ limit X 

270 # ) 

271 el_cond = BinaryOperation(op="=", args=content_condition.args) 

272 threshold = BinaryOperation(op=">=", args=[Identifier("relevance"), Constant(self._negative_set_threshold)]) 

273 res = self.call_kb( 

274 [el_cond, threshold] + other_conditions, disable_reranking=True, limit=self._negative_set_size 

275 ) 

276 

277 return list(res[self.id_column]) 

278 

279 elif content_condition.op == "NOT IN": 279 ↛ 283line 279 didn't jump to line 283 because the condition on line 279 was never true

280 # id NOT IN ( 

281 # select id where content in (‘a’, ‘b’) 

282 # ) 

283 content_condition2 = copy.deepcopy(content_condition) 

284 content_condition2.op = "IN" 

285 

286 threshold = BinaryOperation(op=">=", args=[Identifier("relevance"), Constant(self._negative_set_threshold)]) 

287 res = self.execute_content_condition( 

288 content_condition2, 

289 other_conditions + [threshold], 

290 disable_reranking=True, 

291 limit=self._negative_set_size, 

292 ) 

293 

294 return list(res[self.id_column]) 

295 else: 

296 return None 

297 

298 def execute_blocks(self, block: ConditionBlock) -> pd.DataFrame: 

299 """ 

300 Split block to set of calls with conditions and execute them. Nested blocks are supported 

301 

302 :param block: 

303 :return: dataframe with result of block execution 

304 """ 

305 

306 if not isinstance(block, ConditionBlock): 306 ↛ 308line 306 didn't jump to line 308 because the condition on line 306 was never true

307 # single condition 

308 if self.is_content_condition(block): 

309 return self.execute_content_condition(block) 

310 else: 

311 return self.call_kb([block]) 

312 

313 if block.op == "AND": 313 ↛ 364line 313 didn't jump to line 364 because the condition on line 313 was always true

314 results = [] 

315 

316 content_filters, other_filters = [], [] 

317 for item in block.items: 

318 if isinstance(item, ConditionBlock): 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true

319 results.append(self.execute_blocks(item)) 

320 else: 

321 if self.is_content_condition(item): 

322 content_filters.append(item) 

323 else: 

324 other_filters.append(item) 

325 if len(content_filters) > 0: 325 ↛ 359line 325 didn't jump to line 359 because the condition on line 325 was always true

326 content_filters2 = [] 

327 exclude_ids = set() 

328 include_contents = set() 

329 # exclude content conditions 

330 for condition in content_filters: 

331 ids = self.to_excluded_ids(condition, other_filters) 

332 if ids is not None: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true

333 exclude_ids.update(ids) 

334 continue 

335 contents = self.to_include_content(condition) 

336 if contents is not None: 336 ↛ 341line 336 didn't jump to line 341 because the condition on line 336 was always true

337 include_contents.update(contents) 

338 continue 

339 else: 

340 # keep origin content filter 

341 content_filters2.append(condition) 

342 

343 if exclude_ids: 343 ↛ 345line 343 didn't jump to line 345 because the condition on line 343 was never true

344 # add to filter 

345 values = [Constant(i) for i in exclude_ids] 

346 condition = BinaryOperation(op="NOT IN", args=[Identifier(self.id_column), Tuple(values)]) 

347 other_filters.append(condition) 

348 # execute content filters 

349 if include_contents: 349 ↛ 356line 349 didn't jump to line 356 because the condition on line 349 was always true

350 content = " AND ".join(include_contents) 

351 result = self.execute_content_condition( 

352 BinaryOperation(op="=", args=[Identifier(self.content_column), Constant(content)]), 

353 other_filters, 

354 ) 

355 results.append(result) 

356 for condition in content_filters2: 356 ↛ 357line 356 didn't jump to line 357 because the loop on line 356 never started

357 result = self.execute_content_condition(condition, other_filters) 

358 results.append(result) 

359 elif len(other_filters) > 0: 

360 results.append(self.call_kb(other_filters)) 

361 

362 return self.intersect(results) 

363 

364 elif block.op == "OR": 

365 results = [] 

366 for item in block.items: 

367 results.append(self.execute_blocks(item)) 

368 

369 return self.union(results) 

370 

371 def run(self, query: Select) -> pd.DataFrame: 

372 """ 

373 Plan and execute query to KB. If query has complex conditions: 

374 - convert them to several queries with simple conditions, execute them and combine results 

375 

376 Stages: 

377 - Remove unary NOT from condition: try to apply it to related operator 

378 - Flat conditions tree: convert into condition blocks: 

379 - having with same operators of the same levels in the same block 

380 - Recursively execute blocks 

381 - get data from OR blocks and union them 

382 - get data from AND blocks and intersect them 

383 

384 :param query: select query 

385 :return: results 

386 """ 

387 if query.where is not None: 387 ↛ 394line 387 didn't jump to line 394 because the condition on line 387 was always true

388 query_traversal(query.where, self.convert_unary_ops) 

389 blocks_tree = self.flatten_conditions(query.where) 

390 if query.limit is not None: 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true

391 self.limit = query.limit.value 

392 return self.execute_blocks(blocks_tree) 

393 else: 

394 return self.kb.select(query)