Coverage for mindsdb / integrations / handlers / rag_handler / ingest.py: 0%

75 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import time 

2from typing import List 

3 

4import pandas as pd 

5 

6from mindsdb.integrations.handlers.rag_handler.settings import ( 

7 PersistedVectorStoreSaver, 

8 PersistedVectorStoreSaverConfig, 

9 RAGBaseParameters, 

10 VectorStoreFactory, 

11 df_to_documents, 

12 get_chroma_client, 

13 load_embeddings_model, 

14 url_to_documents, 

15) 

16from mindsdb.utilities import log 

17from langchain_core.documents import Document 

18from langchain_core.vectorstores import VectorStore 

19from langchain_text_splitters import RecursiveCharacterTextSplitter 

20 

21logger = log.getLogger(__name__) 

22 

23 

24def validate_document(doc) -> bool: 

25 """Check an individual document.""" 

26 # Example checks 

27 if not isinstance(doc, Document): 

28 return False 

29 

30 if not doc.page_content: 

31 return False 

32 

33 return True 

34 

35 

36def validate_documents(documents) -> bool: 

37 """Validate document list format.""" 

38 

39 if not isinstance(documents, list): 

40 return False 

41 

42 if not documents: 

43 return False 

44 

45 # Check fields/format of a document 

46 return all([validate_document(doc) for doc in documents]) 

47 

48 

49class RAGIngestor: 

50 """A class for converting a dataframe and/or url to a vectorstore embedded with a given embeddings model""" 

51 

52 def __init__( 

53 self, 

54 args: RAGBaseParameters, 

55 df: pd.DataFrame, 

56 ): 

57 self.args = args 

58 self.df = df 

59 self.embeddings_model_name = args.embeddings_model_name 

60 

61 self.vector_store = VectorStoreFactory.get_vectorstore_class( 

62 args.vector_store_name 

63 ) 

64 

65 def split_documents(self, chunk_size, chunk_overlap) -> list: 

66 # Load documents and split in chunks 

67 logger.info("Loading documents from input data") 

68 

69 documents = [] 

70 

71 text_splitter = RecursiveCharacterTextSplitter( 

72 chunk_size=chunk_size, chunk_overlap=chunk_overlap 

73 ) 

74 

75 if self.df is not None: 

76 # if user provides a dataframe, load documents from dataframe 

77 documents.extend( 

78 df_to_documents( 

79 df=self.df, 

80 page_content_columns=self.args.context_columns, 

81 url_column_name=self.args.url_column_name, 

82 ) 

83 ) 

84 

85 if self.args.url: 

86 # if user provides a url, load documents from url 

87 documents.extend(url_to_documents(self.args.url)) 

88 

89 n_tokens = sum([len(doc.page_content) for doc in documents]) 

90 

91 # split documents into chunks of text 

92 texts = text_splitter.split_documents(documents) 

93 logger.info(f"Loaded {len(documents)} documents from input data") 

94 logger.info(f"Total number of tokens: {n_tokens}") 

95 logger.info(f"Split into {len(texts)} chunks of text (tokens)") 

96 

97 return texts 

98 

99 def create_db_from_documents(self, documents, embeddings_model) -> VectorStore: 

100 """Create DB from documents.""" 

101 

102 if self.args.vector_store_name == "chromadb": 

103 

104 return self.vector_store.from_documents( 

105 documents=documents, 

106 embedding=embeddings_model, 

107 client=get_chroma_client( 

108 persist_directory=self.args.vector_store_storage_path 

109 ), 

110 collection_name=self.args.collection_name, 

111 ) 

112 else: 

113 return self.create_db_from_texts(documents, embeddings_model) 

114 

115 def create_db_from_texts(self, documents, embeddings_model) -> VectorStore: 

116 """Create DB from text content.""" 

117 

118 texts = [doc.page_content for doc in documents] 

119 metadata = [doc.metadata for doc in documents] 

120 

121 return self.vector_store.from_texts( 

122 texts=texts, embedding=embeddings_model, metadatas=metadata 

123 ) 

124 

125 @staticmethod 

126 def _create_batch_embeddings(documents: List[Document], embeddings_batch_size): 

127 """ 

128 create batch of document embeddings 

129 """ 

130 

131 for i in range(0, len(documents), embeddings_batch_size): 

132 yield documents[i: i + embeddings_batch_size] 

133 

134 def embeddings_to_vectordb(self) -> None: 

135 """Create vectorstore from documents and store locally.""" 

136 

137 start_time = time.time() 

138 

139 # Load documents and splits in chunks (if not in evaluation_type mode) 

140 documents = self.split_documents( 

141 chunk_size=self.args.chunk_size, chunk_overlap=self.args.chunk_overlap 

142 ) 

143 

144 # Load embeddings model 

145 embeddings_model = load_embeddings_model( 

146 self.embeddings_model_name, self.args.use_gpu 

147 ) 

148 

149 logger.info("Creating vectorstore from documents") 

150 

151 if not validate_documents(documents): 

152 raise ValueError("Invalid documents") 

153 

154 try: 

155 db = self.create_db_from_documents(documents, embeddings_model) 

156 except Exception as e: 

157 raise Exception( 

158 f"Error loading embeddings to {self.args.vector_store_name}: {e}" 

159 ) 

160 

161 config = PersistedVectorStoreSaverConfig( 

162 vector_store_name=self.args.vector_store_name, 

163 vector_store=db, 

164 persist_directory=self.args.vector_store_storage_path, 

165 collection_name=self.args.collection_name, 

166 ) 

167 

168 vector_store_saver = PersistedVectorStoreSaver(config) 

169 

170 vector_store_saver.save_vector_store(db) 

171 

172 db = None # Free up memory 

173 

174 end_time = time.time() 

175 elapsed_time = round(end_time - start_time) 

176 

177 logger.info(f"Finished creating {self.args.vector_store_name} from texts, it has been " 

178 f"persisted to {self.args.vector_store_storage_path}") 

179 

180 time_minutes = round(elapsed_time / 60) 

181 

182 if time_minutes > 1: 

183 logger.info(f"Elapsed time: {time_minutes} minutes") 

184 else: 

185 logger.info(f"Elapsed time: {elapsed_time} seconds")