Coverage for mindsdb / integrations / utilities / rag / vector_store.py: 36%
57 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import time
2from datetime import timedelta
3from typing import List
5from langchain_core.documents import Document
6from langchain_core.embeddings import Embeddings
7from langchain_core.vectorstores import VectorStore
9from mindsdb.integrations.utilities.rag.loaders.vector_store_loader.vector_store_loader import VectorStoreLoader
10from mindsdb.integrations.utilities.rag.settings import VectorStoreConfig, SearchKwargs
12# gpt-3.5-turbo
13_DEFAULT_TPM_LIMIT = 60000
14_DEFAULT_RATE_LIMIT_INTERVAL = timedelta(seconds=10)
15_INITIAL_TOKEN_USAGE = 0
18class VectorStoreOperator:
19 """
20 Encapsulates the logic for adding documents to a vector store with rate limiting.
21 """
23 def __init__(self,
24 vector_store: VectorStore,
25 embedding_model: Embeddings,
26 documents: List[Document] = None,
27 vector_store_config: VectorStoreConfig = None,
28 token_per_minute_limit: int = _DEFAULT_TPM_LIMIT,
29 rate_limit_interval: timedelta = _DEFAULT_RATE_LIMIT_INTERVAL,
30 search_kwargs: SearchKwargs = None
32 ):
34 self.documents = documents
35 self.embedding_model = embedding_model
36 self.token_per_minute_limit = token_per_minute_limit
37 self.rate_limit_interval = rate_limit_interval
38 self.current_token_usage = _INITIAL_TOKEN_USAGE
39 self._vector_store = None
40 self.vector_store_config = vector_store_config
41 self.search_kwargs = search_kwargs or SearchKwargs()
43 self.verify_vector_store(vector_store, documents)
45 def verify_vector_store(self, vector_store, documents):
46 if documents:
47 self._add_documents_to_store(documents, vector_store)
48 elif isinstance(vector_store, VectorStore):
49 # checking is it instance or subclass instance
50 self._vector_store = vector_store
51 elif issubclass(vector_store, VectorStore):
52 # if it is subclass instance, then create instance of it using vector_store_config
53 self._vector_store = load_vector_store(self.embedding_model, self.vector_store_config)
55 @property
56 def vector_store(self):
57 return self._vector_store
59 @staticmethod
60 def _calculate_token_usage(document):
61 return len(document.page_content)
63 def _rate_limit(self):
64 if self.current_token_usage >= self.token_per_minute_limit:
65 time.sleep(self.rate_limit_interval.total_seconds())
66 self.current_token_usage = _INITIAL_TOKEN_USAGE
68 def _update_token_usage(self, document: Document):
69 self._rate_limit()
70 self.current_token_usage += self._calculate_token_usage(document)
72 def _add_document(self, document: Document):
73 self._update_token_usage(document)
74 self.vector_store.add_documents([document])
76 def _add_documents_to_store(self, documents: List[Document], vector_store: VectorStore):
77 self._init_vector_store(documents, vector_store)
78 self.add_documents(documents)
80 def _init_vector_store(self, documents: List[Document], vector_store: VectorStore):
81 if len(documents) > 0:
82 self._vector_store = vector_store.from_documents(
83 documents=[documents[0]], embedding=self.embedding_model
84 )
86 def add_documents(self, documents: List[Document]):
87 for document in documents:
88 self._add_document(document)
91def load_vector_store(embedding_model: Embeddings, config: VectorStoreConfig) -> VectorStore:
92 """
93 Loads the vector store based on the provided config and embeddings model
94 :param embedding_model:
95 :param config:
96 :return:
97 """
98 loader = VectorStoreLoader(embedding_model=embedding_model, config=config)
99 return loader.load()