Coverage for mindsdb / interfaces / knowledge_base / evaluate.py: 11%

291 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import json 

2import math 

3import re 

4import time 

5import copy 

6from typing import List 

7 

8import pandas as pd 

9import datetime as dt 

10 

11from mindsdb.api.executor.sql_query.result_set import ResultSet 

12from mindsdb_sql_parser import Identifier, Select, Constant, Star, parse_sql, BinaryOperation 

13from mindsdb.utilities import log 

14from mindsdb.utilities.config import config 

15 

16from mindsdb.interfaces.knowledge_base.llm_client import LLMClient 

17 

18logger = log.getLogger(__name__) 

19 

20 

21GENERATE_QA_SYSTEM_PROMPT = """ 

22Your task is to generate question and answer pairs for a search engine. 

23The search engine will take your query and return a list of documents. 

24You will be given a text and you need to generate a question that can be answered using the information in the text. 

25Your questions will be used to evaluate the search engine. 

26Question should always have enough clues to identify the specific text that this question is generated from. 

27Never ask questions like "What license number is associated with Amend 6" because Amend 6 could be found in many documents and the question is not specific enough. 

28Example output 1: {\"query\": \"What processor does the HP 2023 14\" FHD IPS Laptop use?\", \"reference_answer\": \"Ryzen 3 5300U\"} 

29Example output 2: {\"query\": \"What is the name of the river in Paris?\", \"reference_answer\": \"Seine\"} 

30Don't generate questions like "What is being amended in the application?" because these questions cannot be answered using the text and without knowing which document it refers to. 

31The question should be answerable without the text, but the answer should be present in the text. 

32Return ONLY a json response. No other text. 

33""" 

34 

35 

36def calc_entropy(values: List[float]) -> float: 

37 """ 

38 Alternative of scipy.stats.entropy, to not add `scipy` dependency 

39 :param values: Input distribution 

40 :return: The calculated entropy. 

41 """ 

42 # normalize & filter 

43 total = sum(values) 

44 values = [i / total for i in values if i > 0] 

45 # calc 

46 return -sum([pk * math.log(pk) for pk in values]) 

47 

48 

49def sanitize_json_response(response: str) -> str: 

50 """Remove markdown code block formatting from JSON response and extract valid JSON.""" 

51 if not response or not response.strip(): 

52 raise ValueError("Empty response provided.") 

53 

54 # Remove leading/trailing whitespace 

55 response = response.strip() 

56 

57 # Remove markdown code block markers if present 

58 response = re.sub(r"^```(?:json|JSON)?\s*", "", response, flags=re.MULTILINE) 

59 response = re.sub(r"\s*```$", "", response, flags=re.MULTILINE) 

60 response = response.strip() 

61 

62 # Find the first opening brace 

63 start_idx = response.find("{") 

64 if start_idx == -1: 

65 raise ValueError("No JSON object found in the response.") 

66 

67 # Try to parse JSON starting from first { with increasing end positions 

68 # This handles nested objects and strings with braces correctly 

69 for end_idx in range(len(response), start_idx, -1): # Start from end and work backwards 

70 candidate = response[start_idx:end_idx] 

71 try: 

72 parsed = json.loads(candidate) 

73 # Ensure it's a dictionary (object) not just any valid JSON 

74 if isinstance(parsed, dict): 

75 return candidate 

76 except json.JSONDecodeError: 

77 continue 

78 

79 raise ValueError("No valid JSON object found in the response.") 

80 

81 

82class EvaluateBase: 

83 DEFAULT_QUESTION_COUNT = 20 

84 DEFAULT_SAMPLE_SIZE = 10000 

85 

86 def __init__(self, session, knowledge_base): 

87 self.kb = knowledge_base 

88 self.name = knowledge_base._kb.name 

89 self.session = session 

90 

91 self._llm_client = None 

92 

93 def generate(self, sampled_df: pd.DataFrame) -> pd.DataFrame: 

94 # generate test data from sample 

95 raise NotImplementedError 

96 

97 def evaluate(self, test_data: pd.DataFrame) -> pd.DataFrame: 

98 # create evaluate metric from test data 

99 raise NotImplementedError 

100 

101 def _set_llm_client(self, llm_params: dict): 

102 """ 

103 Logic to get LLM setting: 

104 - first get `llm` setting of ‘evaluate’ command 

105 - if not defined, look at the knowledge base reranker config 

106 """ 

107 if llm_params is None: 

108 llm_params = self.kb._kb.params.get("reranking_model") 

109 

110 params = copy.deepcopy(config.get("default_llm", {})) 

111 

112 if llm_params: 

113 params.update(llm_params) 

114 

115 self.llm_client = LLMClient(params) 

116 

117 def generate_test_data(self, gen_params: dict) -> pd.DataFrame: 

118 # Extract source data (from users query or from KB itself) and call `generate` to get test data 

119 

120 if "from_sql" in gen_params: 

121 # get data from sql 

122 query = parse_sql(gen_params["from_sql"]) 

123 if not isinstance(query, Select) or query.from_table is None: 

124 raise ValueError(f"Query not supported {gen_params['from_sql']}") 

125 

126 dn, table_name = self._get_dn_table(query.from_table) 

127 query.from_table = table_name 

128 if query.limit is None: 

129 query.limit = Constant(self.DEFAULT_SAMPLE_SIZE) 

130 

131 response = dn.query(query=query, session=self.session) 

132 df = response.data_frame 

133 

134 if "content" not in df.columns: 

135 raise ValueError(f"`content` column isn't found in provided sql: {gen_params['from_sql']}") 

136 

137 df.rename(columns={"content": "chunk_content"}, inplace=True) 

138 else: 

139 # get data from knowledge base 

140 df = self.kb.select_query( 

141 Select( 

142 targets=[Identifier("chunk_content"), Identifier("id")], limit=Constant(self.DEFAULT_SAMPLE_SIZE) 

143 ) 

144 ) 

145 

146 if "count" in gen_params: 

147 number_of_questions = gen_params["count"] 

148 else: 

149 number_of_questions = self.DEFAULT_QUESTION_COUNT 

150 

151 number_of_questions = min(number_of_questions, len(df)) 

152 sampled_df = df.sample(n=number_of_questions) 

153 

154 return self.generate(sampled_df) 

155 

156 def read_from_table(self, test_table: Identifier) -> pd.DataFrame: 

157 # read data from table 

158 

159 dn, table_name = self._get_dn_table(test_table) 

160 

161 query = Select( 

162 targets=[Star()], 

163 from_table=table_name, 

164 ) 

165 response = dn.query(query=query, session=self.session) 

166 return response.data_frame 

167 

168 def _get_dn_table(self, table_name: Identifier): 

169 if len(table_name.parts) < 2: 

170 raise ValueError(f"Can't find database, table name must have at least 2 parts: {table_name}") 

171 

172 integration_name = table_name.parts[0] 

173 table_name = Identifier(parts=table_name.parts[1:]) 

174 dn = self.session.datahub.get(integration_name) 

175 if dn is None: 

176 raise ValueError(f"Can't find database: {integration_name}") 

177 return dn, table_name 

178 

179 def save_to_table(self, table_name: Identifier, df: pd.DataFrame, is_replace=False): 

180 # save data to table 

181 

182 dn, table_name = self._get_dn_table(table_name) 

183 

184 data = ResultSet.from_df(df) 

185 

186 dn.create_table( 

187 table_name=table_name, 

188 result_set=data, 

189 is_replace=is_replace, 

190 is_create=True, 

191 raise_if_exists=False, 

192 ) 

193 

194 def run_evaluate(self, params: dict) -> pd.DataFrame: 

195 # evaluate function entry point 

196 

197 self._set_llm_client(params.get("llm")) 

198 

199 if "test_table" not in params: 

200 raise ValueError('The table with has to be defined in "test_table" parameter') 

201 

202 test_table = params["test_table"] 

203 

204 if isinstance(test_table, str): 

205 test_table = Identifier(test_table) 

206 

207 if "generate_data" in params: 

208 # generate question / answers using llm 

209 gen_params = params["generate_data"] 

210 if not isinstance(gen_params, dict): 

211 gen_params = {} 

212 test_data = self.generate_test_data(gen_params) 

213 

214 self.save_to_table(test_table, test_data, is_replace=True) 

215 

216 if params.get("evaluate", True) is False: 

217 # no evaluate is required 

218 return pd.DataFrame() 

219 

220 test_data = self.read_from_table(test_table) 

221 

222 scores = self.evaluate(test_data) 

223 scores["id"] = math.floor(time.time()) # unique ID for the evaluation run 

224 scores["name"] = self.name 

225 scores["created_at"] = dt.datetime.now() 

226 

227 # save scores 

228 if "save_to" in params: 

229 to_table = params["save_to"] 

230 if isinstance(to_table, str): 

231 to_table = Identifier(to_table) 

232 self.save_to_table(to_table, scores.copy()) 

233 

234 return scores 

235 

236 @staticmethod 

237 def run(session, kb_table, params) -> pd.DataFrame: 

238 # choose the evaluator version according to the 'version' parameter in config 

239 

240 evaluate_version = params.get("version", "doc_id") 

241 

242 if evaluate_version == "llm_relevancy": 

243 cls = EvaluateRerank 

244 elif evaluate_version == "doc_id": 

245 cls = EvaluateDocID 

246 else: 

247 raise NotImplementedError(f"Version of evaluator is not implemented: {evaluate_version}") 

248 

249 return cls(session, kb_table).run_evaluate(params) 

250 

251 def generate_question_answer(self, text: str) -> (str, str): 

252 messages = [ 

253 {"role": "system", "content": GENERATE_QA_SYSTEM_PROMPT}, 

254 {"role": "user", "content": f"\n\nText:\n{text}\n\n"}, 

255 ] 

256 answer = self.llm_client.completion(messages, json_output=True)[0] 

257 

258 # Sanitize the response by removing markdown code block formatting like ```json 

259 sanitized_answer = sanitize_json_response(answer) 

260 

261 try: 

262 output = json.loads(sanitized_answer) 

263 except json.JSONDecodeError: 

264 raise ValueError(f"Could not parse response from LLM: {answer}") 

265 

266 if "query" not in output or "reference_answer" not in output: 

267 raise ValueError("Cant find question/answer in LLM response") 

268 

269 return output.get("query"), output.get("reference_answer") 

270 

271 

272class EvaluateRerank(EvaluateBase): 

273 """ 

274 Rank responses from KB using LLM (by calling KB reranker function) 

275 """ 

276 

277 TOP_K = 10 

278 

279 def generate(self, sampled_df: pd.DataFrame) -> pd.DataFrame: 

280 qa_data = [] 

281 count_errors = 0 

282 for chunk_content in sampled_df["chunk_content"]: 

283 try: 

284 question, answer = self.generate_question_answer(chunk_content) 

285 except ValueError as e: 

286 # allow some numbers of error 

287 count_errors += 1 

288 if count_errors > 5: 

289 raise e 

290 continue 

291 

292 qa_data.append({"text": chunk_content, "question": question, "answer": answer}) 

293 

294 df = pd.DataFrame(qa_data) 

295 df["id"] = df.index 

296 return df 

297 

298 def evaluate(self, test_data: pd.DataFrame) -> pd.DataFrame: 

299 json_to_log_list = [] 

300 if {"question", "answer"} - set(test_data.columns): 

301 raise KeyError( 

302 f'Test data must contain "question" and "answer" columns. Columns in the provided test data: {list(test_data.columns)}' 

303 ) 

304 questions = test_data.to_dict("records") 

305 

306 for i, item in enumerate(questions): 

307 question = item["question"] 

308 ground_truth = item["answer"] 

309 

310 start_time = time.time() 

311 logger.debug(f"Querying [{i + 1}/{len(questions)}]: {question}") 

312 df_answers = self.kb.select_query( 

313 Select( 

314 targets=[Identifier("chunk_content")], 

315 where=BinaryOperation(op="=", args=[Identifier("content"), Constant(question)]), 

316 limit=Constant(self.TOP_K), 

317 ) 

318 ) 

319 query_time = time.time() - start_time 

320 

321 proposed_responses = list(df_answers["chunk_content"]) 

322 

323 # generate answer using llm 

324 relevance_score_list = self.kb.score_documents(question, proposed_responses, self.llm_client.params) 

325 

326 # set binary relevancy 

327 binary_relevancy_list = [1 if score >= 0.5 else 0 for score in relevance_score_list] 

328 

329 # calculate first relevant position 

330 first_relevant_position = next((i for i, x in enumerate(binary_relevancy_list) if x == 1), None) 

331 json_to_log = { 

332 "question": question, 

333 "ground_truth": ground_truth, 

334 # "relevancy_at_k": relevancy_at_k, 

335 "binary_relevancy_list": binary_relevancy_list, 

336 "relevance_score_list": relevance_score_list, 

337 "first_relevant_position": first_relevant_position, 

338 "query_time": query_time, 

339 } 

340 json_to_log_list.append(json_to_log) 

341 

342 evaluation_results = self.evaluate_retrieval_metrics(json_to_log_list) 

343 return pd.DataFrame([evaluation_results]) 

344 

345 def evaluate_retrieval_metrics(self, json_to_log_list): 

346 """ 

347 Computes retrieval evaluation metrics from the result log. 

348 

349 Metrics computed: 

350 - Average Relevancy (mean soft relevance score) 

351 - Average Relevancy@k (soft score) 

352 - Average First Relevant Position 

353 - Mean Reciprocal Rank (MRR) 

354 - Hit@k 

355 - Binary Precision@k 

356 - Average Entropy of Relevance Scores 

357 - Average nDCG 

358 

359 Args: 

360 json_to_log_list (list): List of evaluation logs per query. 

361 

362 Returns: 

363 dict: A dictionary containing all computed metrics. 

364 """ 

365 

366 mrr_list = [] 

367 hit_at_k_matrix = [] 

368 binary_precision_at_k_matrix = [] 

369 ndcg_list = [] 

370 entropy_list = [] 

371 

372 total_relevancy = 0 

373 relevance_score_matrix = [] 

374 first_relevant_positions = [] 

375 

376 for item in json_to_log_list: 

377 binary_relevancy = item["binary_relevancy_list"] 

378 relevance_scores = item["relevance_score_list"] 

379 

380 # Skip if empty 

381 if not relevance_scores: 

382 continue 

383 

384 # Mean relevancy per query 

385 query_relevancy = sum(relevance_scores) / len(relevance_scores) 

386 total_relevancy += query_relevancy 

387 

388 # Build score matrix for later average@k 

389 relevance_score_matrix.append(relevance_scores) 

390 

391 # First relevant position 

392 pos = item["first_relevant_position"] 

393 if pos is not None: 

394 first_relevant_positions.append(pos) 

395 

396 # MRR 

397 reciprocal_rank = 1 / (pos + 1) if pos is not None else 0 

398 mrr_list.append(reciprocal_rank) 

399 

400 # Hit@k and Binary Precision@k 

401 hit_row = [] 

402 precision_row = [] 

403 for k in range(1, len(binary_relevancy) + 1): 

404 hit = int(any(binary_relevancy[:k])) 

405 precision = sum(binary_relevancy[:k]) / k 

406 hit_row.append(hit) 

407 precision_row.append(precision) 

408 hit_at_k_matrix.append(hit_row) 

409 binary_precision_at_k_matrix.append(precision_row) 

410 

411 # Entropy 

412 

413 entropy = calc_entropy(relevance_scores) if len(relevance_scores) > 1 else 0 

414 entropy_list.append(entropy) 

415 

416 # nDCG 

417 def dcg(scores): 

418 return sum(score / math.log2(idx + 2) for idx, score in enumerate(scores)) 

419 

420 ideal = sorted(relevance_scores, reverse=True) 

421 actual_dcg = dcg(relevance_scores) 

422 ideal_dcg = dcg(ideal) 

423 ndcg = actual_dcg / ideal_dcg if ideal_dcg > 0 else 0 

424 ndcg_list.append(ndcg) 

425 

426 # Aggregated metrics 

427 num_queries = len(json_to_log_list) 

428 average_relevancy = total_relevancy / num_queries if num_queries else 0 

429 

430 # Relevancy@k 

431 average_relevance_score_by_k = [] 

432 if relevance_score_matrix: 

433 relevance_score_matrix = list(zip(*relevance_score_matrix)) 

434 for col in relevance_score_matrix: 

435 avg_k = sum(col) / len(col) 

436 average_relevance_score_by_k.append(round(avg_k, 2)) 

437 

438 average_first_relevant_position = ( 

439 sum(first_relevant_positions) / len(first_relevant_positions) if first_relevant_positions else None 

440 ) 

441 

442 mean_mrr = sum(mrr_list) / len(mrr_list) if mrr_list else 0 

443 hit_at_k_avg = [round(sum(col) / len(col), 2) for col in zip(*hit_at_k_matrix)] if hit_at_k_matrix else [] 

444 binary_precision_at_k_avg = ( 

445 [round(sum(col) / len(col), 2) for col in zip(*binary_precision_at_k_matrix)] 

446 if binary_precision_at_k_matrix 

447 else [] 

448 ) 

449 avg_entropy = sum(entropy_list) / len(entropy_list) if entropy_list else 0 

450 avg_ndcg = sum(ndcg_list) / len(ndcg_list) if ndcg_list else 0 

451 

452 avg_query_time = sum(item["query_time"] for item in json_to_log_list) / num_queries 

453 

454 return { 

455 "avg_relevancy": average_relevancy, 

456 "avg_relevance_score_by_k": average_relevance_score_by_k, 

457 "avg_first_relevant_position": average_first_relevant_position, 

458 "mean_mrr": mean_mrr, 

459 "hit_at_k": hit_at_k_avg, 

460 "bin_precision_at_k": binary_precision_at_k_avg, 

461 "avg_entropy": avg_entropy, 

462 "avg_ndcg": avg_ndcg, 

463 "avg_query_time": avg_query_time, 

464 } 

465 

466 

467class EvaluateDocID(EvaluateBase): 

468 """ 

469 Checks if ID in response from KB is matched with doc ID in test dataset 

470 """ 

471 

472 TOP_K = 20 

473 

474 def generate(self, sampled_df: pd.DataFrame) -> pd.DataFrame: 

475 if "id" not in sampled_df.columns: 

476 raise ValueError("'id' column is required for generating test dataset") 

477 

478 qa_data = [] 

479 count_errors = 0 

480 for _, item in sampled_df.iterrows(): 

481 chunk_content = item["chunk_content"] 

482 try: 

483 question, answer = self.generate_question_answer(chunk_content) 

484 except ValueError as e: 

485 # allow some numbers of error 

486 count_errors += 1 

487 if count_errors > 5: 

488 raise e 

489 continue 

490 

491 qa_data.append({"text": chunk_content, "question": question, "answer": answer, "doc_id": item["id"]}) 

492 if len(qa_data) == 0: 

493 raise ValueError("No data in generated test dataset") 

494 df = pd.DataFrame(qa_data) 

495 return df 

496 

497 def evaluate(self, test_data: pd.DataFrame) -> pd.DataFrame: 

498 stats = [] 

499 if {"question", "doc_id"} - set(test_data.columns): 

500 raise KeyError( 

501 f'Test data must contain "question" and "doc_id" columns. Columns in the provided test data: {list(test_data.columns)}' 

502 ) 

503 questions = test_data.to_dict("records") 

504 

505 for i, item in enumerate(questions): 

506 question = item["question"] 

507 doc_id = item["doc_id"] 

508 

509 start_time = time.time() 

510 logger.debug(f"Querying [{i + 1}/{len(questions)}]: {question}") 

511 df_answers = self.kb.select_query( 

512 Select( 

513 targets=[Identifier("chunk_content"), Identifier("id")], 

514 where=BinaryOperation(op="=", args=[Identifier("content"), Constant(question)]), 

515 limit=Constant(self.TOP_K), 

516 ) 

517 ) 

518 query_time = time.time() - start_time 

519 

520 retrieved_doc_ids = list(df_answers["id"]) 

521 

522 if doc_id in retrieved_doc_ids: 

523 doc_found = True 

524 doc_position = retrieved_doc_ids.index(doc_id) 

525 else: 

526 doc_found = False 

527 doc_position = -1 

528 

529 stats.append( 

530 { 

531 "question": question, 

532 "doc_id": doc_id, 

533 "doc_found": doc_found, 

534 "doc_position": doc_position, 

535 "query_time": query_time, 

536 } 

537 ) 

538 

539 evaluation_results = self.summarize_results(stats) 

540 return pd.DataFrame([evaluation_results]) 

541 

542 def summarize_results(self, stats): 

543 total_questions = len(stats) 

544 total_found = sum([1 for stat in stats if stat["doc_found"]]) 

545 

546 accurate_in_top_10 = sum([1 for stat in stats if stat["doc_found"] and stat["doc_position"] < 10]) 

547 

548 # calculate recall curve by position 

549 recall_curve = {} 

550 for i in range(self.TOP_K): 

551 recall_curve[i] = sum([1 for stat in stats if stat["doc_found"] and stat["doc_position"] == i]) 

552 # convert to proportion of total questions 

553 for i in range(self.TOP_K): 

554 recall_curve[i] = recall_curve[i] / total_questions 

555 # calculate cumulative recall 

556 cumulative_recall = {} 

557 for i in range(self.TOP_K): 

558 cumulative_recall[i] = sum([recall_curve[j] for j in range(i + 1)]) 

559 

560 avg_query_time = sum(item["query_time"] for item in stats) / total_questions 

561 return { 

562 "total": total_questions, 

563 "total_found": total_found, 

564 "retrieved_in_top_10": accurate_in_top_10, 

565 "cumulative_recall": json.dumps(cumulative_recall), 

566 "avg_query_time": avg_query_time, 

567 }