Coverage for mindsdb / integrations / handlers / rag_handler / rag_handler.py: 0%
52 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Dict, Optional
3import pandas as pd
5from mindsdb.integrations.handlers.rag_handler.ingest import RAGIngestor
6from mindsdb.integrations.handlers.rag_handler.rag import RAGQuestionAnswerer
7from mindsdb.integrations.handlers.rag_handler.settings import (
8 DEFAULT_EMBEDDINGS_MODEL,
9 RAGHandlerParameters,
10 build_llm_params,
11)
12from mindsdb.integrations.libs.base import BaseMLEngine
13from mindsdb.utilities import log
16logger = log.getLogger(__name__)
18logger.warning("\nThe RAG handler has been deprecated and is no longer being actively supported. \n"
19 "It will be fully removed in v24.8.x.x, "
20 "for RAG workflows, please migrate to "
21 "Agents + Retrieval skill. \n"
22 "Example usage can be found here: \n"
23 "https://github.com/mindsdb/mindsdb_python_sdk/blob/staging/examples"
24 "/using_agents_with_retrieval.py")
27class RAGHandler(BaseMLEngine):
28 """
29 RAGHandler is a MindsDB integration with supported LLM APIs allows users to run question answering
30 on their data by providing a question.
32 The User is able to provide data that provides context for the questions, see create() method for more details.
34 """
36 name = "rag"
38 def __init__(self, *args, **kwargs):
39 super().__init__(*args, **kwargs)
40 self.generative = True
42 @staticmethod
43 def create_validation(target, args=None, **kwargs):
44 if "using" not in args:
45 raise Exception(
46 "RAG engine requires a USING clause! Refer to its documentation for more details."
47 )
49 def create(
50 self,
51 target: str,
52 df: pd.DataFrame = None,
53 args: Optional[Dict] = None,
54 ):
55 """
56 Dispatch is running embeddings and storing in a VectorDB, unless user already has embeddings persisted
57 """
58 # get api key from user input on create ML_ENGINE or create MODEL
59 args = args["using"]
61 ml_engine_args = self.engine_storage.get_connection_args()
63 # for a model created with USING, only get api for that specific llm type
64 args.update({k: v for k, v in ml_engine_args.items() if args["llm_type"] in k})
66 input_args = build_llm_params(args)
68 args = RAGHandlerParameters(**input_args)
70 # create folder for vector store to persist embeddings or load from existing folder
71 args.vector_store_storage_path = self.engine_storage.folder_get(
72 args.vector_store_folder_name
73 )
75 if args.run_embeddings:
76 if "context_columns" not in args and df is not None:
77 # if no context columns provided, use all columns in df
78 logger.info("No context columns provided, using all columns in df")
79 args.context_columns = df.columns.tolist()
81 if "embeddings_model_name" not in args:
82 logger.info(
83 f"No embeddings model provided in query, using default model: {DEFAULT_EMBEDDINGS_MODEL}"
84 )
86 if df is not None or args.url is not None:
87 # if user provides a dataframe or url, run embeddings and store in vector store
89 ingestor = RAGIngestor(args=args, df=df)
90 ingestor.embeddings_to_vectordb()
92 else:
93 # Note this should only be run if run_embeddings is false or if no data is provided in query
94 logger.info("Skipping embeddings and ingestion into Chroma VectorDB")
96 export_args = args.dict(exclude={"llm_params"})
97 # 'callbacks' aren't json serializable, we do this to avoid errors
98 export_args["llm_params"] = args.llm_params.dict(exclude={"callbacks"})
100 # for mindsdb cloud, store data in shared file system
101 # for cloud version of mindsdb to make it be usable by all mindsdb nodes
102 self.engine_storage.folder_sync(args.vector_store_folder_name)
104 self.model_storage.json_set("args", export_args)
106 def update(self, args) -> None:
108 # build llm params from user input args in update query
109 updated_args = build_llm_params(args["using"], update=True)
111 # get current model args
112 current_model_args = self.model_storage.json_get("args")["using"]
114 # update current args with new args
115 current_model_args.update(updated_args)
117 # validate updated args are valid
118 RAGHandlerParameters(**build_llm_params(current_model_args))
120 # if valid, update model args
121 self.model_storage.json_set("args", current_model_args)
123 def predict(self, df: pd.DataFrame = None, args: dict = None):
124 """
125 Dispatch is performed depending on the underlying model type. Currently, only question answering
126 is supported.
127 """
129 input_args = build_llm_params(self.model_storage.json_get("args"))
131 args = RAGHandlerParameters(**input_args)
133 args.vector_store_storage_path = self.engine_storage.folder_get(
134 args.vector_store_folder_name
135 )
137 # get question answering results
138 question_answerer = RAGQuestionAnswerer(args=args)
140 # get question from sql query
141 # e.g. where question = 'What is the capital of France?'
142 response = question_answerer(df[args.input_column].tolist()[0])
144 return pd.DataFrame(response)