Coverage for mindsdb / interfaces / database / data_handlers_cache.py: 86%
129 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import sys
2import time
3import threading
4from dataclasses import dataclass, field
5from collections import defaultdict
7from mindsdb.integrations.libs.base import DatabaseHandler
8from mindsdb.utilities.context import context as ctx
9from mindsdb.utilities import log
11logger = log.getLogger(__name__)
14@dataclass(kw_only=True, slots=True)
15class HandlersCacheRecord:
16 """Record for a handler in the cache
18 Args:
19 handler (DatabaseHandler): handler instance
20 expired_at (float): time when the handler will be expired
21 """
23 handler: DatabaseHandler
24 expired_at: float
25 connect_attempt_done: threading.Event = field(default_factory=threading.Event)
26 _wait_lock: threading.Lock = field(default_factory=threading.Lock)
28 @property
29 def expired(self) -> bool:
30 """check if the handler is expired
32 Returns:
33 bool: True if the handler is expired, False otherwise
34 """
35 return self.expired_at < time.time()
37 @property
38 def has_references(self) -> bool:
39 """check if the handler has references
41 Returns:
42 bool: True if the handler has references, False otherwise
43 """
44 return sys.getrefcount(self.handler) > 2
46 def wait_no_references(self, timeout: int = 60) -> DatabaseHandler:
47 """wait for the handler to have no references
49 Args:
50 timeout (int): timeout in seconds
52 Returns:
53 DatabaseHandler: handler instance
54 """
55 end_time = time.time() + timeout
56 with self._wait_lock:
57 while time.time() < end_time:
58 if self.has_references is False:
59 return self.handler
60 time.sleep(0.5)
61 raise Exception("Hanlder has references")
63 def connect(self) -> None:
64 """connect to the handler"""
65 try:
66 if not self.handler.is_connected: 66 ↛ 71line 66 didn't jump to line 71 because the condition on line 66 was always true
67 self.handler.connect()
68 except Exception:
69 logger.warning(f"Error connecting to handler: {self.handler.name}", exc_info=True)
70 finally:
71 self.connect_attempt_done.set()
74class HandlersCache:
75 """Cache for data handlers that keep connections opened during ttl time from handler last use
76 The cache manages handlers basing on the following properties:
77 - cache_thread_safe (default True): if True, the handler can be used in any thread, otherwise only in the thread that created it
78 - cache_single_instance (default False): if True, only one instance of the handler can be in the cache
79 - cache_usage_lock (default True): if True, the handler can be returned only if there are no references to it (no one use it)
80 """
82 def __init__(self, ttl: int = 60, clean_timeout: float = 3):
83 """init cache
85 Args:
86 ttl (int): time to live (in seconds) for record in cache
87 clean_timeout (float): interval between cleanups of expired handlers
88 """
89 self.ttl: int = ttl
90 self._clean_timeout: int = clean_timeout
91 self.handlers: dict[str, list[HandlersCacheRecord]] = defaultdict(list)
92 self._lock = threading.RLock()
93 self._stop_event = threading.Event()
94 self.cleaner_thread = None
96 def __del__(self):
97 self._stop_clean()
99 def _start_clean(self) -> None:
100 """start worker that close connections after ttl expired"""
101 if isinstance(self.cleaner_thread, threading.Thread) and self.cleaner_thread.is_alive():
102 return
103 with self._lock:
104 self._stop_event.clear()
105 self.cleaner_thread = threading.Thread(target=self._clean, name="HandlersCache.clean")
106 self.cleaner_thread.daemon = True
107 self.cleaner_thread.start()
109 def _stop_clean(self) -> None:
110 """stop clean worker"""
111 self._stop_event.set()
113 def set(self, handler: DatabaseHandler):
114 """add (or replace) handler in cache
116 NOTE: If the handler is not thread-safe, then use a lock when making connection. Otherwise, make connection in
117 the same thread without using a lock to speed up parallel queries. (They don't need to wait for a connection in
118 another thread.)
120 Args:
121 handler (DatabaseHandler)
122 """
123 cache_thread_safe = getattr(handler, "cache_thread_safe", True)
124 with self._lock:
125 cache_single_instance = getattr(handler, "cache_single_instance", False)
126 if cache_single_instance:
127 records, _ = self._get_cache_records(handler.name)
128 if len(records) > 0:
129 raise ValueError(
130 "Attempt to add to the HandlersCache second instance of handler with cache_single_instance=True"
131 )
132 try:
133 # If the handler is defined to be thread safe, set 0 as the last element of the key, otherwise set the thrad ID.
134 key = (
135 handler.name,
136 ctx.company_id,
137 0 if cache_thread_safe else threading.get_native_id(),
138 )
139 record = HandlersCacheRecord(handler=handler, expired_at=time.time() + self.ttl)
140 self.handlers[key].append(record)
141 except Exception:
142 logger.warning("Error setting data handler cache record:", exc_info=True)
143 return
144 self._start_clean()
145 record.connect()
147 def _get_cache_records(self, name: str) -> tuple[list[HandlersCacheRecord] | None, str]:
148 """get cache records by name
150 Args:
151 name (str): handler name
153 Returns:
154 tuple[list[HandlersCacheRecord] | None, str]: cache records and key of the handler in cache
155 """
156 # If the handler is not thread safe, the thread ID will be assigned to the last element of the key.
157 key = (name, ctx.company_id, 0)
158 if key not in self.handlers:
159 key = (name, ctx.company_id, threading.get_native_id())
160 return self.handlers.get(key, []), key
162 def get(self, name: str) -> DatabaseHandler | None:
163 """get handler from cache by name
165 Args:
166 name (str): handler name
168 Returns:
169 DatabaseHandler
170 """
171 record_to_wait = None
172 with self._lock:
173 records, _ = self._get_cache_records(name)
174 for record in records:
175 cache_single_instance = getattr(record.handler, "cache_single_instance", False)
176 cache_usage_lock = getattr(record.handler, "cache_usage_lock", True)
177 has_references = record.has_references
179 if cache_single_instance and has_references and cache_usage_lock:
180 # in this case - wait out of the current lock to prevent global lock of HandlerCache
181 record_to_wait = record
182 break
184 if has_references and cache_usage_lock:
185 continue
187 record.expired_at = time.time() + self.ttl
188 if record.connect_attempt_done.wait(timeout=10) is False: 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true
189 logger.warning(f"Handler's connection attempt has not finished in 10s: {record.handler.name}")
190 return record.handler
192 if record_to_wait:
193 handler = record_to_wait.wait_no_references()
194 record_to_wait.expired_at = time.time() + self.ttl
195 return handler
197 return None
199 def delete(self, name: str) -> None:
200 """delete handler from cache
202 Args:
203 name (str): handler name
204 """
205 with self._lock:
206 records, key = self._get_cache_records(name)
207 if len(records) > 0:
208 del self.handlers[key]
209 for record in records:
210 try:
211 record.handler.disconnect()
212 except Exception:
213 logger.debug("Error disconnecting data handler:", exc_info=True)
215 if len(self.handlers) == 0: 215 ↛ exitline 215 didn't jump to the function exit
216 self._stop_clean()
218 def _clean(self) -> None:
219 """worker that delete from cache handlers that was not in use for ttl"""
220 while self._stop_event.wait(timeout=self._clean_timeout) is False:
221 with self._lock:
222 for key in list(self.handlers.keys()):
223 active_handlers_list = []
224 for record in self.handlers[key]:
225 if record.expired and record.has_references is False:
226 try:
227 record.handler.disconnect()
228 except Exception:
229 logger.debug("Error disconnecting data handler:", exc_info=True)
230 else:
231 active_handlers_list.append(record)
232 if len(active_handlers_list) > 0:
233 self.handlers[key] = active_handlers_list
234 else:
235 del self.handlers[key]
237 if len(self.handlers) == 0:
238 self._stop_event.set()