Coverage for mindsdb / integrations / utilities / rag / vector_store.py: 36%

57 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import time 

2from datetime import timedelta 

3from typing import List 

4 

5from langchain_core.documents import Document 

6from langchain_core.embeddings import Embeddings 

7from langchain_core.vectorstores import VectorStore 

8 

9from mindsdb.integrations.utilities.rag.loaders.vector_store_loader.vector_store_loader import VectorStoreLoader 

10from mindsdb.integrations.utilities.rag.settings import VectorStoreConfig, SearchKwargs 

11 

12# gpt-3.5-turbo 

13_DEFAULT_TPM_LIMIT = 60000 

14_DEFAULT_RATE_LIMIT_INTERVAL = timedelta(seconds=10) 

15_INITIAL_TOKEN_USAGE = 0 

16 

17 

18class VectorStoreOperator: 

19 """ 

20 Encapsulates the logic for adding documents to a vector store with rate limiting. 

21 """ 

22 

23 def __init__(self, 

24 vector_store: VectorStore, 

25 embedding_model: Embeddings, 

26 documents: List[Document] = None, 

27 vector_store_config: VectorStoreConfig = None, 

28 token_per_minute_limit: int = _DEFAULT_TPM_LIMIT, 

29 rate_limit_interval: timedelta = _DEFAULT_RATE_LIMIT_INTERVAL, 

30 search_kwargs: SearchKwargs = None 

31 

32 ): 

33 

34 self.documents = documents 

35 self.embedding_model = embedding_model 

36 self.token_per_minute_limit = token_per_minute_limit 

37 self.rate_limit_interval = rate_limit_interval 

38 self.current_token_usage = _INITIAL_TOKEN_USAGE 

39 self._vector_store = None 

40 self.vector_store_config = vector_store_config 

41 self.search_kwargs = search_kwargs or SearchKwargs() 

42 

43 self.verify_vector_store(vector_store, documents) 

44 

45 def verify_vector_store(self, vector_store, documents): 

46 if documents: 

47 self._add_documents_to_store(documents, vector_store) 

48 elif isinstance(vector_store, VectorStore): 

49 # checking is it instance or subclass instance 

50 self._vector_store = vector_store 

51 elif issubclass(vector_store, VectorStore): 

52 # if it is subclass instance, then create instance of it using vector_store_config 

53 self._vector_store = load_vector_store(self.embedding_model, self.vector_store_config) 

54 

55 @property 

56 def vector_store(self): 

57 return self._vector_store 

58 

59 @staticmethod 

60 def _calculate_token_usage(document): 

61 return len(document.page_content) 

62 

63 def _rate_limit(self): 

64 if self.current_token_usage >= self.token_per_minute_limit: 

65 time.sleep(self.rate_limit_interval.total_seconds()) 

66 self.current_token_usage = _INITIAL_TOKEN_USAGE 

67 

68 def _update_token_usage(self, document: Document): 

69 self._rate_limit() 

70 self.current_token_usage += self._calculate_token_usage(document) 

71 

72 def _add_document(self, document: Document): 

73 self._update_token_usage(document) 

74 self.vector_store.add_documents([document]) 

75 

76 def _add_documents_to_store(self, documents: List[Document], vector_store: VectorStore): 

77 self._init_vector_store(documents, vector_store) 

78 self.add_documents(documents) 

79 

80 def _init_vector_store(self, documents: List[Document], vector_store: VectorStore): 

81 if len(documents) > 0: 

82 self._vector_store = vector_store.from_documents( 

83 documents=[documents[0]], embedding=self.embedding_model 

84 ) 

85 

86 def add_documents(self, documents: List[Document]): 

87 for document in documents: 

88 self._add_document(document) 

89 

90 

91def load_vector_store(embedding_model: Embeddings, config: VectorStoreConfig) -> VectorStore: 

92 """ 

93 Loads the vector store based on the provided config and embeddings model 

94 :param embedding_model: 

95 :param config: 

96 :return: 

97 """ 

98 loader = VectorStoreLoader(embedding_model=embedding_model, config=config) 

99 return loader.load()