Coverage for mindsdb / integrations / handlers / rag_handler / rag_handler.py: 0%

52 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Dict, Optional 

2 

3import pandas as pd 

4 

5from mindsdb.integrations.handlers.rag_handler.ingest import RAGIngestor 

6from mindsdb.integrations.handlers.rag_handler.rag import RAGQuestionAnswerer 

7from mindsdb.integrations.handlers.rag_handler.settings import ( 

8 DEFAULT_EMBEDDINGS_MODEL, 

9 RAGHandlerParameters, 

10 build_llm_params, 

11) 

12from mindsdb.integrations.libs.base import BaseMLEngine 

13from mindsdb.utilities import log 

14 

15 

16logger = log.getLogger(__name__) 

17 

18logger.warning("\nThe RAG handler has been deprecated and is no longer being actively supported. \n" 

19 "It will be fully removed in v24.8.x.x, " 

20 "for RAG workflows, please migrate to " 

21 "Agents + Retrieval skill. \n" 

22 "Example usage can be found here: \n" 

23 "https://github.com/mindsdb/mindsdb_python_sdk/blob/staging/examples" 

24 "/using_agents_with_retrieval.py") 

25 

26 

27class RAGHandler(BaseMLEngine): 

28 """ 

29 RAGHandler is a MindsDB integration with supported LLM APIs allows users to run question answering 

30 on their data by providing a question. 

31 

32 The User is able to provide data that provides context for the questions, see create() method for more details. 

33 

34 """ 

35 

36 name = "rag" 

37 

38 def __init__(self, *args, **kwargs): 

39 super().__init__(*args, **kwargs) 

40 self.generative = True 

41 

42 @staticmethod 

43 def create_validation(target, args=None, **kwargs): 

44 if "using" not in args: 

45 raise Exception( 

46 "RAG engine requires a USING clause! Refer to its documentation for more details." 

47 ) 

48 

49 def create( 

50 self, 

51 target: str, 

52 df: pd.DataFrame = None, 

53 args: Optional[Dict] = None, 

54 ): 

55 """ 

56 Dispatch is running embeddings and storing in a VectorDB, unless user already has embeddings persisted 

57 """ 

58 # get api key from user input on create ML_ENGINE or create MODEL 

59 args = args["using"] 

60 

61 ml_engine_args = self.engine_storage.get_connection_args() 

62 

63 # for a model created with USING, only get api for that specific llm type 

64 args.update({k: v for k, v in ml_engine_args.items() if args["llm_type"] in k}) 

65 

66 input_args = build_llm_params(args) 

67 

68 args = RAGHandlerParameters(**input_args) 

69 

70 # create folder for vector store to persist embeddings or load from existing folder 

71 args.vector_store_storage_path = self.engine_storage.folder_get( 

72 args.vector_store_folder_name 

73 ) 

74 

75 if args.run_embeddings: 

76 if "context_columns" not in args and df is not None: 

77 # if no context columns provided, use all columns in df 

78 logger.info("No context columns provided, using all columns in df") 

79 args.context_columns = df.columns.tolist() 

80 

81 if "embeddings_model_name" not in args: 

82 logger.info( 

83 f"No embeddings model provided in query, using default model: {DEFAULT_EMBEDDINGS_MODEL}" 

84 ) 

85 

86 if df is not None or args.url is not None: 

87 # if user provides a dataframe or url, run embeddings and store in vector store 

88 

89 ingestor = RAGIngestor(args=args, df=df) 

90 ingestor.embeddings_to_vectordb() 

91 

92 else: 

93 # Note this should only be run if run_embeddings is false or if no data is provided in query 

94 logger.info("Skipping embeddings and ingestion into Chroma VectorDB") 

95 

96 export_args = args.dict(exclude={"llm_params"}) 

97 # 'callbacks' aren't json serializable, we do this to avoid errors 

98 export_args["llm_params"] = args.llm_params.dict(exclude={"callbacks"}) 

99 

100 # for mindsdb cloud, store data in shared file system 

101 # for cloud version of mindsdb to make it be usable by all mindsdb nodes 

102 self.engine_storage.folder_sync(args.vector_store_folder_name) 

103 

104 self.model_storage.json_set("args", export_args) 

105 

106 def update(self, args) -> None: 

107 

108 # build llm params from user input args in update query 

109 updated_args = build_llm_params(args["using"], update=True) 

110 

111 # get current model args 

112 current_model_args = self.model_storage.json_get("args")["using"] 

113 

114 # update current args with new args 

115 current_model_args.update(updated_args) 

116 

117 # validate updated args are valid 

118 RAGHandlerParameters(**build_llm_params(current_model_args)) 

119 

120 # if valid, update model args 

121 self.model_storage.json_set("args", current_model_args) 

122 

123 def predict(self, df: pd.DataFrame = None, args: dict = None): 

124 """ 

125 Dispatch is performed depending on the underlying model type. Currently, only question answering 

126 is supported. 

127 """ 

128 

129 input_args = build_llm_params(self.model_storage.json_get("args")) 

130 

131 args = RAGHandlerParameters(**input_args) 

132 

133 args.vector_store_storage_path = self.engine_storage.folder_get( 

134 args.vector_store_folder_name 

135 ) 

136 

137 # get question answering results 

138 question_answerer = RAGQuestionAnswerer(args=args) 

139 

140 # get question from sql query 

141 # e.g. where question = 'What is the capital of France?' 

142 response = question_answerer(df[args.input_column].tolist()[0]) 

143 

144 return pd.DataFrame(response)