Coverage for mindsdb / integrations / handlers / writer_handler / evaluate.py: 0%
152 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import ast
2from collections import defaultdict
3from typing import List
5import nltk
6import pandas as pd
7from nltk import word_tokenize
8from nltk.translate.bleu_score import ( # todo investigate why this always returns 0, not used for now
9 sentence_bleu,
10)
11from nltk.translate.meteor_score import meteor_score
12from rouge_score import rouge_scorer
13from scipy.spatial import distance
15from mindsdb.integrations.handlers.writer_handler.settings import (
16 WriterHandlerParameters,
17)
18from mindsdb.utilities import log
20# todo use polars for this for speed
22logger = log.getLogger(__name__)
25class WriterEvaluator:
26 def __init__(self, args: WriterHandlerParameters, df: pd.DataFrame, rag):
28 self.args = args
29 self.df = df
30 self.rag = rag(self.args)
32 self.metric_map = {
33 "cosine_similarity": self.calculate_cosine_similarities,
34 "accuracy": self.get_matches,
35 "rouge": self.calculate_rouge,
36 "bleu": self.calculate_bleu,
37 "meteor": self.calculate_meteor,
38 }
40 self.retrieval_metrics = self.args.retrieval_evaluation_metrics
41 self.generator_metrics = self.args.generation_evaluation_metrics
43 self.mean_evaluation_metrics = defaultdict(list)
45 if args.evaluation_type == "e2e":
46 # todo check if this is fine for cloud, better to download once and load from disk
47 nltk.download("wordnet")
49 def calculate_retrieval_metrics(
50 self,
51 df: pd.DataFrame,
52 context_embeddings,
53 retrieved_context_embeddings,
54 prefix="retrieval_",
55 ):
56 """Calculate retrieval metrics"""
58 for metric in self.retrieval_metrics:
59 col_name = f"{prefix}{metric}"
60 if metric == "cosine_similarity":
61 df[col_name] = self.metric_map[metric](
62 context_embeddings, retrieved_context_embeddings
63 )
64 elif metric == "accuracy":
65 col_name = f"{prefix}match"
66 df[col_name] = self.get_matches(
67 gt_embeddings=context_embeddings,
68 test_embeddings=retrieved_context_embeddings,
69 threshold=self.args.retriever_match_threshold,
70 )
71 else:
72 raise ValueError(f"metric {metric} not supported")
74 self.store_mean_metric(col_name=col_name, mean_metric=df[col_name].mean())
76 return df
78 def calculate_generation_metrics(
79 self,
80 df: pd.DataFrame,
81 generated_answer_embeddings,
82 reference_answer_embeddings,
83 prefix="generator_",
84 ):
85 """Calculate generation metrics"""
87 for metric in self.generator_metrics:
88 col_name = f"{prefix}{metric}"
89 if metric == "cosine_similarity":
91 df[col_name] = self.calculate_cosine_similarities(
92 generated_answer_embeddings, reference_answer_embeddings
93 )
94 elif metric == "accuracy":
95 col_name = f"{prefix}match"
96 df[col_name] = self.get_matches(
97 gt_embeddings=reference_answer_embeddings,
98 test_embeddings=generated_answer_embeddings,
99 threshold=self.args.generator_match_threshold,
100 )
101 elif metric == "rouge":
102 df[col_name] = df.apply(
103 lambda x: self.metric_map[metric](
104 x["generated_answers"], x["reference_answers"]
105 ),
106 axis=1,
107 )
108 self.extract_rogue_scores(df, rogue_scores_col=col_name)
109 elif metric == "bleu":
110 df[col_name] = df.apply(
111 lambda x: self.metric_map[metric](
112 x["tokenized_generated_answers"],
113 x["tokenized_reference_answers"],
114 ),
115 axis=1,
116 )
117 elif metric == "meteor":
118 df[col_name] = df.apply(
119 lambda x: self.calculate_meteor(
120 x["tokenized_generated_answers"],
121 x["tokenized_reference_answers"],
122 ),
123 axis=1,
124 )
125 else:
126 raise ValueError(f"metric {metric} not supported")
128 if metric != "rouge":
130 self.store_mean_metric(
131 col_name=col_name, mean_metric=df[col_name].mean()
132 )
134 return df
136 def embed_texts(self, texts: List[str]) -> List[list]:
137 """Embed a list of texts"""
138 return self.rag.embeddings_model.embed_documents(texts)
140 def query_vector_store(self, question: str) -> List:
141 """Query the vector store"""
142 return self.rag.query_vector_store(question)
144 @staticmethod
145 def extract_returned_text(vector_store_response: List) -> List:
146 # todo: this is a hack, we need to fix this so it works with multiple context ie top_k>1
147 # todo handle empty response
148 return [doc.page_content for doc in vector_store_response][0]
150 def evaluation_prompt(self, question: str, context: str):
151 """Create prompt for evaluating RAG"""
153 if self.args.summarize_context:
155 return self.rag.summarize_context(
156 question=question, combined_context=context
157 )
159 return self.rag.prompt_template.format(question=question, context=context)
161 def get_evaluation_prompts(self, df: pd.DataFrame) -> List[str]:
162 """Create prompts for each question and context pair in the dataframe"""
163 return [
164 self.evaluation_prompt(question, context)
165 for question, context in zip(df["question"], df["retrieved_context"])
166 ]
168 def extract_generated_texts(self, responses: List[str]):
169 """Extract generated text from LLM response"""
171 results = []
172 for i, item in enumerate(responses):
174 text = self.rag.extract_generated_text(item)
176 results.append(text)
177 return results
179 @staticmethod
180 def extract_reference_answers(df: pd.DataFrame) -> List[str]:
181 """Get reference answers for each question in the dataframe"""
183 # todo: this is a hack, we need to fix this so it works with multiple answers ie top_k>1
184 answers = df["answers"].tolist()
185 extracted_answers = []
187 for answer in answers:
188 try:
189 extracted_answers.append(ast.literal_eval(answer)["text"][0])
190 except IndexError as e:
191 logger.error(e)
192 extracted_answers.append("")
193 continue
195 return extracted_answers
197 @staticmethod
198 def _calculate_cosine_similarity(
199 gt_embeddings: List[float], test_embeddings: List[float]
200 ) -> float:
201 """Calculate cosine similarity between a context and retrieved context embedding"""
202 cosine_sim = 1 - distance.cosine(gt_embeddings, test_embeddings)
204 return cosine_sim
206 def calculate_cosine_similarities(
207 self,
208 gt_embeddings: List[List[float]],
209 test_embeddings: List[List[float]],
210 ):
211 """Calculate cosine similarity for each ground truth and retrieved/generated pair for a given question"""
213 return [
214 self._calculate_cosine_similarity(
215 context_embedding, retrieved_context_embedding
216 )
217 for context_embedding, retrieved_context_embedding in zip(
218 gt_embeddings, test_embeddings
219 )
220 ]
222 @staticmethod
223 def check_match(cosine_similarity: float, threshold: float = 0.7) -> int:
224 return int(cosine_similarity >= threshold)
226 def get_matches(
227 self, gt_embeddings, test_embeddings, threshold: float = 0.7
228 ) -> List[int]:
229 """Get matches for each ground truth and retrieved/generated pair for a given question"""
231 cosine_similarities = self.calculate_cosine_similarities(
232 gt_embeddings=gt_embeddings, test_embeddings=test_embeddings
233 )
235 matches = [
236 self.check_match(cosine_similarity, threshold=threshold)
237 for cosine_similarity in cosine_similarities
238 ]
240 return matches
242 @staticmethod
243 def _tokenize(text: str) -> List[str]:
244 """Tokenize a text"""
245 return word_tokenize(text)
247 @staticmethod
248 def calculate_rouge(generated: str, reference: str) -> dict:
250 scorer = rouge_scorer.RougeScorer(["rouge1", "rougeL"], use_stemmer=True)
251 score = scorer.score(generated, reference)
252 return score
254 def extract_rogue_scores(
255 self, df: pd.DataFrame, rogue_scores_col: str = "rouge_scores"
256 ):
257 """Extract rouge scores from dataframe"""
258 rouge_metrics = ["rouge1", "rougeL"]
259 supported_metrics = ["precision", "recall", "fmeasure"]
261 for rouge_metric in rouge_metrics:
262 for supported_metric in supported_metrics:
263 col_name = f"{rouge_metric}_{supported_metric}"
264 df[col_name] = df.apply(
265 lambda x: getattr(
266 x[rogue_scores_col][rouge_metric], supported_metric
267 ),
268 axis=1,
269 )
271 self.store_mean_metric(
272 col_name=col_name, mean_metric=df[col_name].mean()
273 )
275 def store_mean_metric(self, col_name: str, mean_metric: float):
276 """Calculate mean metric for each metric"""
278 self.mean_evaluation_metrics[f"mean_{col_name}"].append(mean_metric)
280 @staticmethod
281 def calculate_bleu(
282 generated_tokens: List[str], reference_tokens: List[str]
283 ) -> float:
284 return sentence_bleu([reference_tokens], generated_tokens)
286 @staticmethod
287 def calculate_meteor(
288 generated_tokens: List[str], reference_tokens: List[str]
289 ) -> float:
290 return meteor_score([reference_tokens], generated_tokens)
292 def evaluate_retrieval(self):
293 """Evaluate the retrieval model"""
295 df = self.df.copy(deep=True)
297 # get question answering results
298 df["retrieved_context"] = df.apply(
299 lambda x: self.extract_returned_text(
300 self.query_vector_store(x["question"])
301 ),
302 axis=1,
303 )
305 # embed context and retrieved context
306 context_embeddings = self.embed_texts(df["context"].tolist())
307 retrieved_context_embeddings = self.embed_texts(
308 df["retrieved_context"].tolist()
309 )
311 df = self.calculate_retrieval_metrics(
312 df, context_embeddings, retrieved_context_embeddings
313 )
315 return df
317 def evaluate_generation(self, df: pd.DataFrame):
318 """Evaluate the generation model, given the retrieval results df"""
320 prompts = self.get_evaluation_prompts(df)
322 raw_generated_answers = [self.rag.llm(prompt) for prompt in prompts]
324 generated_answers = self.extract_generated_texts(raw_generated_answers)
325 reference_answers = self.extract_reference_answers(df)
327 df["generated_answers"] = generated_answers
328 df["reference_answers"] = reference_answers
330 # tokenize generated and reference answers
331 df["tokenized_generated_answers"] = df.apply(
332 lambda x: self._tokenize(x["generated_answers"]), axis=1
333 )
334 df["tokenized_reference_answers"] = df.apply(
335 lambda x: self._tokenize(x["reference_answers"]), axis=1
336 )
338 # embed generated answers and reference answers
339 generated_answer_embeddings = self.embed_texts(generated_answers)
341 reference_answer_embeddings = self.embed_texts(reference_answers)
343 df = self.calculate_generation_metrics(
344 df, generated_answer_embeddings, reference_answer_embeddings
345 )
347 return df
349 def evaluate_e2e(self):
350 """Evaluate the end-to-end evaluation"""
351 retrieval_df = self.evaluate_retrieval()
352 e2e_df = self.evaluate_generation(retrieval_df)
354 return e2e_df
356 def evaluate(self):
357 if self.args.evaluation_type == "retrieval":
358 return self.evaluate_retrieval()
359 elif self.args.evaluation_type == "e2e":
360 return self.evaluate_e2e()
361 else:
362 raise ValueError("evaluation_type must be either 'retrieval' or 'e2e'")