Coverage for mindsdb / interfaces / knowledge_base / evaluate.py: 11%
291 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import json
2import math
3import re
4import time
5import copy
6from typing import List
8import pandas as pd
9import datetime as dt
11from mindsdb.api.executor.sql_query.result_set import ResultSet
12from mindsdb_sql_parser import Identifier, Select, Constant, Star, parse_sql, BinaryOperation
13from mindsdb.utilities import log
14from mindsdb.utilities.config import config
16from mindsdb.interfaces.knowledge_base.llm_client import LLMClient
18logger = log.getLogger(__name__)
21GENERATE_QA_SYSTEM_PROMPT = """
22Your task is to generate question and answer pairs for a search engine.
23The search engine will take your query and return a list of documents.
24You will be given a text and you need to generate a question that can be answered using the information in the text.
25Your questions will be used to evaluate the search engine.
26Question should always have enough clues to identify the specific text that this question is generated from.
27Never ask questions like "What license number is associated with Amend 6" because Amend 6 could be found in many documents and the question is not specific enough.
28Example output 1: {\"query\": \"What processor does the HP 2023 14\" FHD IPS Laptop use?\", \"reference_answer\": \"Ryzen 3 5300U\"}
29Example output 2: {\"query\": \"What is the name of the river in Paris?\", \"reference_answer\": \"Seine\"}
30Don't generate questions like "What is being amended in the application?" because these questions cannot be answered using the text and without knowing which document it refers to.
31The question should be answerable without the text, but the answer should be present in the text.
32Return ONLY a json response. No other text.
33"""
36def calc_entropy(values: List[float]) -> float:
37 """
38 Alternative of scipy.stats.entropy, to not add `scipy` dependency
39 :param values: Input distribution
40 :return: The calculated entropy.
41 """
42 # normalize & filter
43 total = sum(values)
44 values = [i / total for i in values if i > 0]
45 # calc
46 return -sum([pk * math.log(pk) for pk in values])
49def sanitize_json_response(response: str) -> str:
50 """Remove markdown code block formatting from JSON response and extract valid JSON."""
51 if not response or not response.strip():
52 raise ValueError("Empty response provided.")
54 # Remove leading/trailing whitespace
55 response = response.strip()
57 # Remove markdown code block markers if present
58 response = re.sub(r"^```(?:json|JSON)?\s*", "", response, flags=re.MULTILINE)
59 response = re.sub(r"\s*```$", "", response, flags=re.MULTILINE)
60 response = response.strip()
62 # Find the first opening brace
63 start_idx = response.find("{")
64 if start_idx == -1:
65 raise ValueError("No JSON object found in the response.")
67 # Try to parse JSON starting from first { with increasing end positions
68 # This handles nested objects and strings with braces correctly
69 for end_idx in range(len(response), start_idx, -1): # Start from end and work backwards
70 candidate = response[start_idx:end_idx]
71 try:
72 parsed = json.loads(candidate)
73 # Ensure it's a dictionary (object) not just any valid JSON
74 if isinstance(parsed, dict):
75 return candidate
76 except json.JSONDecodeError:
77 continue
79 raise ValueError("No valid JSON object found in the response.")
82class EvaluateBase:
83 DEFAULT_QUESTION_COUNT = 20
84 DEFAULT_SAMPLE_SIZE = 10000
86 def __init__(self, session, knowledge_base):
87 self.kb = knowledge_base
88 self.name = knowledge_base._kb.name
89 self.session = session
91 self._llm_client = None
93 def generate(self, sampled_df: pd.DataFrame) -> pd.DataFrame:
94 # generate test data from sample
95 raise NotImplementedError
97 def evaluate(self, test_data: pd.DataFrame) -> pd.DataFrame:
98 # create evaluate metric from test data
99 raise NotImplementedError
101 def _set_llm_client(self, llm_params: dict):
102 """
103 Logic to get LLM setting:
104 - first get `llm` setting of ‘evaluate’ command
105 - if not defined, look at the knowledge base reranker config
106 """
107 if llm_params is None:
108 llm_params = self.kb._kb.params.get("reranking_model")
110 params = copy.deepcopy(config.get("default_llm", {}))
112 if llm_params:
113 params.update(llm_params)
115 self.llm_client = LLMClient(params)
117 def generate_test_data(self, gen_params: dict) -> pd.DataFrame:
118 # Extract source data (from users query or from KB itself) and call `generate` to get test data
120 if "from_sql" in gen_params:
121 # get data from sql
122 query = parse_sql(gen_params["from_sql"])
123 if not isinstance(query, Select) or query.from_table is None:
124 raise ValueError(f"Query not supported {gen_params['from_sql']}")
126 dn, table_name = self._get_dn_table(query.from_table)
127 query.from_table = table_name
128 if query.limit is None:
129 query.limit = Constant(self.DEFAULT_SAMPLE_SIZE)
131 response = dn.query(query=query, session=self.session)
132 df = response.data_frame
134 if "content" not in df.columns:
135 raise ValueError(f"`content` column isn't found in provided sql: {gen_params['from_sql']}")
137 df.rename(columns={"content": "chunk_content"}, inplace=True)
138 else:
139 # get data from knowledge base
140 df = self.kb.select_query(
141 Select(
142 targets=[Identifier("chunk_content"), Identifier("id")], limit=Constant(self.DEFAULT_SAMPLE_SIZE)
143 )
144 )
146 if "count" in gen_params:
147 number_of_questions = gen_params["count"]
148 else:
149 number_of_questions = self.DEFAULT_QUESTION_COUNT
151 number_of_questions = min(number_of_questions, len(df))
152 sampled_df = df.sample(n=number_of_questions)
154 return self.generate(sampled_df)
156 def read_from_table(self, test_table: Identifier) -> pd.DataFrame:
157 # read data from table
159 dn, table_name = self._get_dn_table(test_table)
161 query = Select(
162 targets=[Star()],
163 from_table=table_name,
164 )
165 response = dn.query(query=query, session=self.session)
166 return response.data_frame
168 def _get_dn_table(self, table_name: Identifier):
169 if len(table_name.parts) < 2:
170 raise ValueError(f"Can't find database, table name must have at least 2 parts: {table_name}")
172 integration_name = table_name.parts[0]
173 table_name = Identifier(parts=table_name.parts[1:])
174 dn = self.session.datahub.get(integration_name)
175 if dn is None:
176 raise ValueError(f"Can't find database: {integration_name}")
177 return dn, table_name
179 def save_to_table(self, table_name: Identifier, df: pd.DataFrame, is_replace=False):
180 # save data to table
182 dn, table_name = self._get_dn_table(table_name)
184 data = ResultSet.from_df(df)
186 dn.create_table(
187 table_name=table_name,
188 result_set=data,
189 is_replace=is_replace,
190 is_create=True,
191 raise_if_exists=False,
192 )
194 def run_evaluate(self, params: dict) -> pd.DataFrame:
195 # evaluate function entry point
197 self._set_llm_client(params.get("llm"))
199 if "test_table" not in params:
200 raise ValueError('The table with has to be defined in "test_table" parameter')
202 test_table = params["test_table"]
204 if isinstance(test_table, str):
205 test_table = Identifier(test_table)
207 if "generate_data" in params:
208 # generate question / answers using llm
209 gen_params = params["generate_data"]
210 if not isinstance(gen_params, dict):
211 gen_params = {}
212 test_data = self.generate_test_data(gen_params)
214 self.save_to_table(test_table, test_data, is_replace=True)
216 if params.get("evaluate", True) is False:
217 # no evaluate is required
218 return pd.DataFrame()
220 test_data = self.read_from_table(test_table)
222 scores = self.evaluate(test_data)
223 scores["id"] = math.floor(time.time()) # unique ID for the evaluation run
224 scores["name"] = self.name
225 scores["created_at"] = dt.datetime.now()
227 # save scores
228 if "save_to" in params:
229 to_table = params["save_to"]
230 if isinstance(to_table, str):
231 to_table = Identifier(to_table)
232 self.save_to_table(to_table, scores.copy())
234 return scores
236 @staticmethod
237 def run(session, kb_table, params) -> pd.DataFrame:
238 # choose the evaluator version according to the 'version' parameter in config
240 evaluate_version = params.get("version", "doc_id")
242 if evaluate_version == "llm_relevancy":
243 cls = EvaluateRerank
244 elif evaluate_version == "doc_id":
245 cls = EvaluateDocID
246 else:
247 raise NotImplementedError(f"Version of evaluator is not implemented: {evaluate_version}")
249 return cls(session, kb_table).run_evaluate(params)
251 def generate_question_answer(self, text: str) -> (str, str):
252 messages = [
253 {"role": "system", "content": GENERATE_QA_SYSTEM_PROMPT},
254 {"role": "user", "content": f"\n\nText:\n{text}\n\n"},
255 ]
256 answer = self.llm_client.completion(messages, json_output=True)[0]
258 # Sanitize the response by removing markdown code block formatting like ```json
259 sanitized_answer = sanitize_json_response(answer)
261 try:
262 output = json.loads(sanitized_answer)
263 except json.JSONDecodeError:
264 raise ValueError(f"Could not parse response from LLM: {answer}")
266 if "query" not in output or "reference_answer" not in output:
267 raise ValueError("Cant find question/answer in LLM response")
269 return output.get("query"), output.get("reference_answer")
272class EvaluateRerank(EvaluateBase):
273 """
274 Rank responses from KB using LLM (by calling KB reranker function)
275 """
277 TOP_K = 10
279 def generate(self, sampled_df: pd.DataFrame) -> pd.DataFrame:
280 qa_data = []
281 count_errors = 0
282 for chunk_content in sampled_df["chunk_content"]:
283 try:
284 question, answer = self.generate_question_answer(chunk_content)
285 except ValueError as e:
286 # allow some numbers of error
287 count_errors += 1
288 if count_errors > 5:
289 raise e
290 continue
292 qa_data.append({"text": chunk_content, "question": question, "answer": answer})
294 df = pd.DataFrame(qa_data)
295 df["id"] = df.index
296 return df
298 def evaluate(self, test_data: pd.DataFrame) -> pd.DataFrame:
299 json_to_log_list = []
300 if {"question", "answer"} - set(test_data.columns):
301 raise KeyError(
302 f'Test data must contain "question" and "answer" columns. Columns in the provided test data: {list(test_data.columns)}'
303 )
304 questions = test_data.to_dict("records")
306 for i, item in enumerate(questions):
307 question = item["question"]
308 ground_truth = item["answer"]
310 start_time = time.time()
311 logger.debug(f"Querying [{i + 1}/{len(questions)}]: {question}")
312 df_answers = self.kb.select_query(
313 Select(
314 targets=[Identifier("chunk_content")],
315 where=BinaryOperation(op="=", args=[Identifier("content"), Constant(question)]),
316 limit=Constant(self.TOP_K),
317 )
318 )
319 query_time = time.time() - start_time
321 proposed_responses = list(df_answers["chunk_content"])
323 # generate answer using llm
324 relevance_score_list = self.kb.score_documents(question, proposed_responses, self.llm_client.params)
326 # set binary relevancy
327 binary_relevancy_list = [1 if score >= 0.5 else 0 for score in relevance_score_list]
329 # calculate first relevant position
330 first_relevant_position = next((i for i, x in enumerate(binary_relevancy_list) if x == 1), None)
331 json_to_log = {
332 "question": question,
333 "ground_truth": ground_truth,
334 # "relevancy_at_k": relevancy_at_k,
335 "binary_relevancy_list": binary_relevancy_list,
336 "relevance_score_list": relevance_score_list,
337 "first_relevant_position": first_relevant_position,
338 "query_time": query_time,
339 }
340 json_to_log_list.append(json_to_log)
342 evaluation_results = self.evaluate_retrieval_metrics(json_to_log_list)
343 return pd.DataFrame([evaluation_results])
345 def evaluate_retrieval_metrics(self, json_to_log_list):
346 """
347 Computes retrieval evaluation metrics from the result log.
349 Metrics computed:
350 - Average Relevancy (mean soft relevance score)
351 - Average Relevancy@k (soft score)
352 - Average First Relevant Position
353 - Mean Reciprocal Rank (MRR)
354 - Hit@k
355 - Binary Precision@k
356 - Average Entropy of Relevance Scores
357 - Average nDCG
359 Args:
360 json_to_log_list (list): List of evaluation logs per query.
362 Returns:
363 dict: A dictionary containing all computed metrics.
364 """
366 mrr_list = []
367 hit_at_k_matrix = []
368 binary_precision_at_k_matrix = []
369 ndcg_list = []
370 entropy_list = []
372 total_relevancy = 0
373 relevance_score_matrix = []
374 first_relevant_positions = []
376 for item in json_to_log_list:
377 binary_relevancy = item["binary_relevancy_list"]
378 relevance_scores = item["relevance_score_list"]
380 # Skip if empty
381 if not relevance_scores:
382 continue
384 # Mean relevancy per query
385 query_relevancy = sum(relevance_scores) / len(relevance_scores)
386 total_relevancy += query_relevancy
388 # Build score matrix for later average@k
389 relevance_score_matrix.append(relevance_scores)
391 # First relevant position
392 pos = item["first_relevant_position"]
393 if pos is not None:
394 first_relevant_positions.append(pos)
396 # MRR
397 reciprocal_rank = 1 / (pos + 1) if pos is not None else 0
398 mrr_list.append(reciprocal_rank)
400 # Hit@k and Binary Precision@k
401 hit_row = []
402 precision_row = []
403 for k in range(1, len(binary_relevancy) + 1):
404 hit = int(any(binary_relevancy[:k]))
405 precision = sum(binary_relevancy[:k]) / k
406 hit_row.append(hit)
407 precision_row.append(precision)
408 hit_at_k_matrix.append(hit_row)
409 binary_precision_at_k_matrix.append(precision_row)
411 # Entropy
413 entropy = calc_entropy(relevance_scores) if len(relevance_scores) > 1 else 0
414 entropy_list.append(entropy)
416 # nDCG
417 def dcg(scores):
418 return sum(score / math.log2(idx + 2) for idx, score in enumerate(scores))
420 ideal = sorted(relevance_scores, reverse=True)
421 actual_dcg = dcg(relevance_scores)
422 ideal_dcg = dcg(ideal)
423 ndcg = actual_dcg / ideal_dcg if ideal_dcg > 0 else 0
424 ndcg_list.append(ndcg)
426 # Aggregated metrics
427 num_queries = len(json_to_log_list)
428 average_relevancy = total_relevancy / num_queries if num_queries else 0
430 # Relevancy@k
431 average_relevance_score_by_k = []
432 if relevance_score_matrix:
433 relevance_score_matrix = list(zip(*relevance_score_matrix))
434 for col in relevance_score_matrix:
435 avg_k = sum(col) / len(col)
436 average_relevance_score_by_k.append(round(avg_k, 2))
438 average_first_relevant_position = (
439 sum(first_relevant_positions) / len(first_relevant_positions) if first_relevant_positions else None
440 )
442 mean_mrr = sum(mrr_list) / len(mrr_list) if mrr_list else 0
443 hit_at_k_avg = [round(sum(col) / len(col), 2) for col in zip(*hit_at_k_matrix)] if hit_at_k_matrix else []
444 binary_precision_at_k_avg = (
445 [round(sum(col) / len(col), 2) for col in zip(*binary_precision_at_k_matrix)]
446 if binary_precision_at_k_matrix
447 else []
448 )
449 avg_entropy = sum(entropy_list) / len(entropy_list) if entropy_list else 0
450 avg_ndcg = sum(ndcg_list) / len(ndcg_list) if ndcg_list else 0
452 avg_query_time = sum(item["query_time"] for item in json_to_log_list) / num_queries
454 return {
455 "avg_relevancy": average_relevancy,
456 "avg_relevance_score_by_k": average_relevance_score_by_k,
457 "avg_first_relevant_position": average_first_relevant_position,
458 "mean_mrr": mean_mrr,
459 "hit_at_k": hit_at_k_avg,
460 "bin_precision_at_k": binary_precision_at_k_avg,
461 "avg_entropy": avg_entropy,
462 "avg_ndcg": avg_ndcg,
463 "avg_query_time": avg_query_time,
464 }
467class EvaluateDocID(EvaluateBase):
468 """
469 Checks if ID in response from KB is matched with doc ID in test dataset
470 """
472 TOP_K = 20
474 def generate(self, sampled_df: pd.DataFrame) -> pd.DataFrame:
475 if "id" not in sampled_df.columns:
476 raise ValueError("'id' column is required for generating test dataset")
478 qa_data = []
479 count_errors = 0
480 for _, item in sampled_df.iterrows():
481 chunk_content = item["chunk_content"]
482 try:
483 question, answer = self.generate_question_answer(chunk_content)
484 except ValueError as e:
485 # allow some numbers of error
486 count_errors += 1
487 if count_errors > 5:
488 raise e
489 continue
491 qa_data.append({"text": chunk_content, "question": question, "answer": answer, "doc_id": item["id"]})
492 if len(qa_data) == 0:
493 raise ValueError("No data in generated test dataset")
494 df = pd.DataFrame(qa_data)
495 return df
497 def evaluate(self, test_data: pd.DataFrame) -> pd.DataFrame:
498 stats = []
499 if {"question", "doc_id"} - set(test_data.columns):
500 raise KeyError(
501 f'Test data must contain "question" and "doc_id" columns. Columns in the provided test data: {list(test_data.columns)}'
502 )
503 questions = test_data.to_dict("records")
505 for i, item in enumerate(questions):
506 question = item["question"]
507 doc_id = item["doc_id"]
509 start_time = time.time()
510 logger.debug(f"Querying [{i + 1}/{len(questions)}]: {question}")
511 df_answers = self.kb.select_query(
512 Select(
513 targets=[Identifier("chunk_content"), Identifier("id")],
514 where=BinaryOperation(op="=", args=[Identifier("content"), Constant(question)]),
515 limit=Constant(self.TOP_K),
516 )
517 )
518 query_time = time.time() - start_time
520 retrieved_doc_ids = list(df_answers["id"])
522 if doc_id in retrieved_doc_ids:
523 doc_found = True
524 doc_position = retrieved_doc_ids.index(doc_id)
525 else:
526 doc_found = False
527 doc_position = -1
529 stats.append(
530 {
531 "question": question,
532 "doc_id": doc_id,
533 "doc_found": doc_found,
534 "doc_position": doc_position,
535 "query_time": query_time,
536 }
537 )
539 evaluation_results = self.summarize_results(stats)
540 return pd.DataFrame([evaluation_results])
542 def summarize_results(self, stats):
543 total_questions = len(stats)
544 total_found = sum([1 for stat in stats if stat["doc_found"]])
546 accurate_in_top_10 = sum([1 for stat in stats if stat["doc_found"] and stat["doc_position"] < 10])
548 # calculate recall curve by position
549 recall_curve = {}
550 for i in range(self.TOP_K):
551 recall_curve[i] = sum([1 for stat in stats if stat["doc_found"] and stat["doc_position"] == i])
552 # convert to proportion of total questions
553 for i in range(self.TOP_K):
554 recall_curve[i] = recall_curve[i] / total_questions
555 # calculate cumulative recall
556 cumulative_recall = {}
557 for i in range(self.TOP_K):
558 cumulative_recall[i] = sum([recall_curve[j] for j in range(i + 1)])
560 avg_query_time = sum(item["query_time"] for item in stats) / total_questions
561 return {
562 "total": total_questions,
563 "total_found": total_found,
564 "retrieved_in_top_10": accurate_in_top_10,
565 "cumulative_recall": json.dumps(cumulative_recall),
566 "avg_query_time": avg_query_time,
567 }