Coverage for mindsdb / integrations / handlers / rag_handler / ingest.py: 0%
75 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import time
2from typing import List
4import pandas as pd
6from mindsdb.integrations.handlers.rag_handler.settings import (
7 PersistedVectorStoreSaver,
8 PersistedVectorStoreSaverConfig,
9 RAGBaseParameters,
10 VectorStoreFactory,
11 df_to_documents,
12 get_chroma_client,
13 load_embeddings_model,
14 url_to_documents,
15)
16from mindsdb.utilities import log
17from langchain_core.documents import Document
18from langchain_core.vectorstores import VectorStore
19from langchain_text_splitters import RecursiveCharacterTextSplitter
21logger = log.getLogger(__name__)
24def validate_document(doc) -> bool:
25 """Check an individual document."""
26 # Example checks
27 if not isinstance(doc, Document):
28 return False
30 if not doc.page_content:
31 return False
33 return True
36def validate_documents(documents) -> bool:
37 """Validate document list format."""
39 if not isinstance(documents, list):
40 return False
42 if not documents:
43 return False
45 # Check fields/format of a document
46 return all([validate_document(doc) for doc in documents])
49class RAGIngestor:
50 """A class for converting a dataframe and/or url to a vectorstore embedded with a given embeddings model"""
52 def __init__(
53 self,
54 args: RAGBaseParameters,
55 df: pd.DataFrame,
56 ):
57 self.args = args
58 self.df = df
59 self.embeddings_model_name = args.embeddings_model_name
61 self.vector_store = VectorStoreFactory.get_vectorstore_class(
62 args.vector_store_name
63 )
65 def split_documents(self, chunk_size, chunk_overlap) -> list:
66 # Load documents and split in chunks
67 logger.info("Loading documents from input data")
69 documents = []
71 text_splitter = RecursiveCharacterTextSplitter(
72 chunk_size=chunk_size, chunk_overlap=chunk_overlap
73 )
75 if self.df is not None:
76 # if user provides a dataframe, load documents from dataframe
77 documents.extend(
78 df_to_documents(
79 df=self.df,
80 page_content_columns=self.args.context_columns,
81 url_column_name=self.args.url_column_name,
82 )
83 )
85 if self.args.url:
86 # if user provides a url, load documents from url
87 documents.extend(url_to_documents(self.args.url))
89 n_tokens = sum([len(doc.page_content) for doc in documents])
91 # split documents into chunks of text
92 texts = text_splitter.split_documents(documents)
93 logger.info(f"Loaded {len(documents)} documents from input data")
94 logger.info(f"Total number of tokens: {n_tokens}")
95 logger.info(f"Split into {len(texts)} chunks of text (tokens)")
97 return texts
99 def create_db_from_documents(self, documents, embeddings_model) -> VectorStore:
100 """Create DB from documents."""
102 if self.args.vector_store_name == "chromadb":
104 return self.vector_store.from_documents(
105 documents=documents,
106 embedding=embeddings_model,
107 client=get_chroma_client(
108 persist_directory=self.args.vector_store_storage_path
109 ),
110 collection_name=self.args.collection_name,
111 )
112 else:
113 return self.create_db_from_texts(documents, embeddings_model)
115 def create_db_from_texts(self, documents, embeddings_model) -> VectorStore:
116 """Create DB from text content."""
118 texts = [doc.page_content for doc in documents]
119 metadata = [doc.metadata for doc in documents]
121 return self.vector_store.from_texts(
122 texts=texts, embedding=embeddings_model, metadatas=metadata
123 )
125 @staticmethod
126 def _create_batch_embeddings(documents: List[Document], embeddings_batch_size):
127 """
128 create batch of document embeddings
129 """
131 for i in range(0, len(documents), embeddings_batch_size):
132 yield documents[i: i + embeddings_batch_size]
134 def embeddings_to_vectordb(self) -> None:
135 """Create vectorstore from documents and store locally."""
137 start_time = time.time()
139 # Load documents and splits in chunks (if not in evaluation_type mode)
140 documents = self.split_documents(
141 chunk_size=self.args.chunk_size, chunk_overlap=self.args.chunk_overlap
142 )
144 # Load embeddings model
145 embeddings_model = load_embeddings_model(
146 self.embeddings_model_name, self.args.use_gpu
147 )
149 logger.info("Creating vectorstore from documents")
151 if not validate_documents(documents):
152 raise ValueError("Invalid documents")
154 try:
155 db = self.create_db_from_documents(documents, embeddings_model)
156 except Exception as e:
157 raise Exception(
158 f"Error loading embeddings to {self.args.vector_store_name}: {e}"
159 )
161 config = PersistedVectorStoreSaverConfig(
162 vector_store_name=self.args.vector_store_name,
163 vector_store=db,
164 persist_directory=self.args.vector_store_storage_path,
165 collection_name=self.args.collection_name,
166 )
168 vector_store_saver = PersistedVectorStoreSaver(config)
170 vector_store_saver.save_vector_store(db)
172 db = None # Free up memory
174 end_time = time.time()
175 elapsed_time = round(end_time - start_time)
177 logger.info(f"Finished creating {self.args.vector_store_name} from texts, it has been "
178 f"persisted to {self.args.vector_store_storage_path}")
180 time_minutes = round(elapsed_time / 60)
182 if time_minutes > 1:
183 logger.info(f"Elapsed time: {time_minutes} minutes")
184 else:
185 logger.info(f"Elapsed time: {elapsed_time} seconds")