Coverage for mindsdb / integrations / handlers / writer_handler / evaluate.py: 0%

152 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import ast 

2from collections import defaultdict 

3from typing import List 

4 

5import nltk 

6import pandas as pd 

7from nltk import word_tokenize 

8from nltk.translate.bleu_score import ( # todo investigate why this always returns 0, not used for now 

9 sentence_bleu, 

10) 

11from nltk.translate.meteor_score import meteor_score 

12from rouge_score import rouge_scorer 

13from scipy.spatial import distance 

14 

15from mindsdb.integrations.handlers.writer_handler.settings import ( 

16 WriterHandlerParameters, 

17) 

18from mindsdb.utilities import log 

19 

20# todo use polars for this for speed 

21 

22logger = log.getLogger(__name__) 

23 

24 

25class WriterEvaluator: 

26 def __init__(self, args: WriterHandlerParameters, df: pd.DataFrame, rag): 

27 

28 self.args = args 

29 self.df = df 

30 self.rag = rag(self.args) 

31 

32 self.metric_map = { 

33 "cosine_similarity": self.calculate_cosine_similarities, 

34 "accuracy": self.get_matches, 

35 "rouge": self.calculate_rouge, 

36 "bleu": self.calculate_bleu, 

37 "meteor": self.calculate_meteor, 

38 } 

39 

40 self.retrieval_metrics = self.args.retrieval_evaluation_metrics 

41 self.generator_metrics = self.args.generation_evaluation_metrics 

42 

43 self.mean_evaluation_metrics = defaultdict(list) 

44 

45 if args.evaluation_type == "e2e": 

46 # todo check if this is fine for cloud, better to download once and load from disk 

47 nltk.download("wordnet") 

48 

49 def calculate_retrieval_metrics( 

50 self, 

51 df: pd.DataFrame, 

52 context_embeddings, 

53 retrieved_context_embeddings, 

54 prefix="retrieval_", 

55 ): 

56 """Calculate retrieval metrics""" 

57 

58 for metric in self.retrieval_metrics: 

59 col_name = f"{prefix}{metric}" 

60 if metric == "cosine_similarity": 

61 df[col_name] = self.metric_map[metric]( 

62 context_embeddings, retrieved_context_embeddings 

63 ) 

64 elif metric == "accuracy": 

65 col_name = f"{prefix}match" 

66 df[col_name] = self.get_matches( 

67 gt_embeddings=context_embeddings, 

68 test_embeddings=retrieved_context_embeddings, 

69 threshold=self.args.retriever_match_threshold, 

70 ) 

71 else: 

72 raise ValueError(f"metric {metric} not supported") 

73 

74 self.store_mean_metric(col_name=col_name, mean_metric=df[col_name].mean()) 

75 

76 return df 

77 

78 def calculate_generation_metrics( 

79 self, 

80 df: pd.DataFrame, 

81 generated_answer_embeddings, 

82 reference_answer_embeddings, 

83 prefix="generator_", 

84 ): 

85 """Calculate generation metrics""" 

86 

87 for metric in self.generator_metrics: 

88 col_name = f"{prefix}{metric}" 

89 if metric == "cosine_similarity": 

90 

91 df[col_name] = self.calculate_cosine_similarities( 

92 generated_answer_embeddings, reference_answer_embeddings 

93 ) 

94 elif metric == "accuracy": 

95 col_name = f"{prefix}match" 

96 df[col_name] = self.get_matches( 

97 gt_embeddings=reference_answer_embeddings, 

98 test_embeddings=generated_answer_embeddings, 

99 threshold=self.args.generator_match_threshold, 

100 ) 

101 elif metric == "rouge": 

102 df[col_name] = df.apply( 

103 lambda x: self.metric_map[metric]( 

104 x["generated_answers"], x["reference_answers"] 

105 ), 

106 axis=1, 

107 ) 

108 self.extract_rogue_scores(df, rogue_scores_col=col_name) 

109 elif metric == "bleu": 

110 df[col_name] = df.apply( 

111 lambda x: self.metric_map[metric]( 

112 x["tokenized_generated_answers"], 

113 x["tokenized_reference_answers"], 

114 ), 

115 axis=1, 

116 ) 

117 elif metric == "meteor": 

118 df[col_name] = df.apply( 

119 lambda x: self.calculate_meteor( 

120 x["tokenized_generated_answers"], 

121 x["tokenized_reference_answers"], 

122 ), 

123 axis=1, 

124 ) 

125 else: 

126 raise ValueError(f"metric {metric} not supported") 

127 

128 if metric != "rouge": 

129 

130 self.store_mean_metric( 

131 col_name=col_name, mean_metric=df[col_name].mean() 

132 ) 

133 

134 return df 

135 

136 def embed_texts(self, texts: List[str]) -> List[list]: 

137 """Embed a list of texts""" 

138 return self.rag.embeddings_model.embed_documents(texts) 

139 

140 def query_vector_store(self, question: str) -> List: 

141 """Query the vector store""" 

142 return self.rag.query_vector_store(question) 

143 

144 @staticmethod 

145 def extract_returned_text(vector_store_response: List) -> List: 

146 # todo: this is a hack, we need to fix this so it works with multiple context ie top_k>1 

147 # todo handle empty response 

148 return [doc.page_content for doc in vector_store_response][0] 

149 

150 def evaluation_prompt(self, question: str, context: str): 

151 """Create prompt for evaluating RAG""" 

152 

153 if self.args.summarize_context: 

154 

155 return self.rag.summarize_context( 

156 question=question, combined_context=context 

157 ) 

158 

159 return self.rag.prompt_template.format(question=question, context=context) 

160 

161 def get_evaluation_prompts(self, df: pd.DataFrame) -> List[str]: 

162 """Create prompts for each question and context pair in the dataframe""" 

163 return [ 

164 self.evaluation_prompt(question, context) 

165 for question, context in zip(df["question"], df["retrieved_context"]) 

166 ] 

167 

168 def extract_generated_texts(self, responses: List[str]): 

169 """Extract generated text from LLM response""" 

170 

171 results = [] 

172 for i, item in enumerate(responses): 

173 

174 text = self.rag.extract_generated_text(item) 

175 

176 results.append(text) 

177 return results 

178 

179 @staticmethod 

180 def extract_reference_answers(df: pd.DataFrame) -> List[str]: 

181 """Get reference answers for each question in the dataframe""" 

182 

183 # todo: this is a hack, we need to fix this so it works with multiple answers ie top_k>1 

184 answers = df["answers"].tolist() 

185 extracted_answers = [] 

186 

187 for answer in answers: 

188 try: 

189 extracted_answers.append(ast.literal_eval(answer)["text"][0]) 

190 except IndexError as e: 

191 logger.error(e) 

192 extracted_answers.append("") 

193 continue 

194 

195 return extracted_answers 

196 

197 @staticmethod 

198 def _calculate_cosine_similarity( 

199 gt_embeddings: List[float], test_embeddings: List[float] 

200 ) -> float: 

201 """Calculate cosine similarity between a context and retrieved context embedding""" 

202 cosine_sim = 1 - distance.cosine(gt_embeddings, test_embeddings) 

203 

204 return cosine_sim 

205 

206 def calculate_cosine_similarities( 

207 self, 

208 gt_embeddings: List[List[float]], 

209 test_embeddings: List[List[float]], 

210 ): 

211 """Calculate cosine similarity for each ground truth and retrieved/generated pair for a given question""" 

212 

213 return [ 

214 self._calculate_cosine_similarity( 

215 context_embedding, retrieved_context_embedding 

216 ) 

217 for context_embedding, retrieved_context_embedding in zip( 

218 gt_embeddings, test_embeddings 

219 ) 

220 ] 

221 

222 @staticmethod 

223 def check_match(cosine_similarity: float, threshold: float = 0.7) -> int: 

224 return int(cosine_similarity >= threshold) 

225 

226 def get_matches( 

227 self, gt_embeddings, test_embeddings, threshold: float = 0.7 

228 ) -> List[int]: 

229 """Get matches for each ground truth and retrieved/generated pair for a given question""" 

230 

231 cosine_similarities = self.calculate_cosine_similarities( 

232 gt_embeddings=gt_embeddings, test_embeddings=test_embeddings 

233 ) 

234 

235 matches = [ 

236 self.check_match(cosine_similarity, threshold=threshold) 

237 for cosine_similarity in cosine_similarities 

238 ] 

239 

240 return matches 

241 

242 @staticmethod 

243 def _tokenize(text: str) -> List[str]: 

244 """Tokenize a text""" 

245 return word_tokenize(text) 

246 

247 @staticmethod 

248 def calculate_rouge(generated: str, reference: str) -> dict: 

249 

250 scorer = rouge_scorer.RougeScorer(["rouge1", "rougeL"], use_stemmer=True) 

251 score = scorer.score(generated, reference) 

252 return score 

253 

254 def extract_rogue_scores( 

255 self, df: pd.DataFrame, rogue_scores_col: str = "rouge_scores" 

256 ): 

257 """Extract rouge scores from dataframe""" 

258 rouge_metrics = ["rouge1", "rougeL"] 

259 supported_metrics = ["precision", "recall", "fmeasure"] 

260 

261 for rouge_metric in rouge_metrics: 

262 for supported_metric in supported_metrics: 

263 col_name = f"{rouge_metric}_{supported_metric}" 

264 df[col_name] = df.apply( 

265 lambda x: getattr( 

266 x[rogue_scores_col][rouge_metric], supported_metric 

267 ), 

268 axis=1, 

269 ) 

270 

271 self.store_mean_metric( 

272 col_name=col_name, mean_metric=df[col_name].mean() 

273 ) 

274 

275 def store_mean_metric(self, col_name: str, mean_metric: float): 

276 """Calculate mean metric for each metric""" 

277 

278 self.mean_evaluation_metrics[f"mean_{col_name}"].append(mean_metric) 

279 

280 @staticmethod 

281 def calculate_bleu( 

282 generated_tokens: List[str], reference_tokens: List[str] 

283 ) -> float: 

284 return sentence_bleu([reference_tokens], generated_tokens) 

285 

286 @staticmethod 

287 def calculate_meteor( 

288 generated_tokens: List[str], reference_tokens: List[str] 

289 ) -> float: 

290 return meteor_score([reference_tokens], generated_tokens) 

291 

292 def evaluate_retrieval(self): 

293 """Evaluate the retrieval model""" 

294 

295 df = self.df.copy(deep=True) 

296 

297 # get question answering results 

298 df["retrieved_context"] = df.apply( 

299 lambda x: self.extract_returned_text( 

300 self.query_vector_store(x["question"]) 

301 ), 

302 axis=1, 

303 ) 

304 

305 # embed context and retrieved context 

306 context_embeddings = self.embed_texts(df["context"].tolist()) 

307 retrieved_context_embeddings = self.embed_texts( 

308 df["retrieved_context"].tolist() 

309 ) 

310 

311 df = self.calculate_retrieval_metrics( 

312 df, context_embeddings, retrieved_context_embeddings 

313 ) 

314 

315 return df 

316 

317 def evaluate_generation(self, df: pd.DataFrame): 

318 """Evaluate the generation model, given the retrieval results df""" 

319 

320 prompts = self.get_evaluation_prompts(df) 

321 

322 raw_generated_answers = [self.rag.llm(prompt) for prompt in prompts] 

323 

324 generated_answers = self.extract_generated_texts(raw_generated_answers) 

325 reference_answers = self.extract_reference_answers(df) 

326 

327 df["generated_answers"] = generated_answers 

328 df["reference_answers"] = reference_answers 

329 

330 # tokenize generated and reference answers 

331 df["tokenized_generated_answers"] = df.apply( 

332 lambda x: self._tokenize(x["generated_answers"]), axis=1 

333 ) 

334 df["tokenized_reference_answers"] = df.apply( 

335 lambda x: self._tokenize(x["reference_answers"]), axis=1 

336 ) 

337 

338 # embed generated answers and reference answers 

339 generated_answer_embeddings = self.embed_texts(generated_answers) 

340 

341 reference_answer_embeddings = self.embed_texts(reference_answers) 

342 

343 df = self.calculate_generation_metrics( 

344 df, generated_answer_embeddings, reference_answer_embeddings 

345 ) 

346 

347 return df 

348 

349 def evaluate_e2e(self): 

350 """Evaluate the end-to-end evaluation""" 

351 retrieval_df = self.evaluate_retrieval() 

352 e2e_df = self.evaluate_generation(retrieval_df) 

353 

354 return e2e_df 

355 

356 def evaluate(self): 

357 if self.args.evaluation_type == "retrieval": 

358 return self.evaluate_retrieval() 

359 elif self.args.evaluation_type == "e2e": 

360 return self.evaluate_e2e() 

361 else: 

362 raise ValueError("evaluation_type must be either 'retrieval' or 'e2e'")