Coverage for mindsdb / interfaces / database / data_handlers_cache.py: 86%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import sys 

2import time 

3import threading 

4from dataclasses import dataclass, field 

5from collections import defaultdict 

6 

7from mindsdb.integrations.libs.base import DatabaseHandler 

8from mindsdb.utilities.context import context as ctx 

9from mindsdb.utilities import log 

10 

11logger = log.getLogger(__name__) 

12 

13 

14@dataclass(kw_only=True, slots=True) 

15class HandlersCacheRecord: 

16 """Record for a handler in the cache 

17 

18 Args: 

19 handler (DatabaseHandler): handler instance 

20 expired_at (float): time when the handler will be expired 

21 """ 

22 

23 handler: DatabaseHandler 

24 expired_at: float 

25 connect_attempt_done: threading.Event = field(default_factory=threading.Event) 

26 _wait_lock: threading.Lock = field(default_factory=threading.Lock) 

27 

28 @property 

29 def expired(self) -> bool: 

30 """check if the handler is expired 

31 

32 Returns: 

33 bool: True if the handler is expired, False otherwise 

34 """ 

35 return self.expired_at < time.time() 

36 

37 @property 

38 def has_references(self) -> bool: 

39 """check if the handler has references 

40 

41 Returns: 

42 bool: True if the handler has references, False otherwise 

43 """ 

44 return sys.getrefcount(self.handler) > 2 

45 

46 def wait_no_references(self, timeout: int = 60) -> DatabaseHandler: 

47 """wait for the handler to have no references 

48 

49 Args: 

50 timeout (int): timeout in seconds 

51 

52 Returns: 

53 DatabaseHandler: handler instance 

54 """ 

55 end_time = time.time() + timeout 

56 with self._wait_lock: 

57 while time.time() < end_time: 

58 if self.has_references is False: 

59 return self.handler 

60 time.sleep(0.5) 

61 raise Exception("Hanlder has references") 

62 

63 def connect(self) -> None: 

64 """connect to the handler""" 

65 try: 

66 if not self.handler.is_connected: 66 ↛ 71line 66 didn't jump to line 71 because the condition on line 66 was always true

67 self.handler.connect() 

68 except Exception: 

69 logger.warning(f"Error connecting to handler: {self.handler.name}", exc_info=True) 

70 finally: 

71 self.connect_attempt_done.set() 

72 

73 

74class HandlersCache: 

75 """Cache for data handlers that keep connections opened during ttl time from handler last use 

76 The cache manages handlers basing on the following properties: 

77 - cache_thread_safe (default True): if True, the handler can be used in any thread, otherwise only in the thread that created it 

78 - cache_single_instance (default False): if True, only one instance of the handler can be in the cache 

79 - cache_usage_lock (default True): if True, the handler can be returned only if there are no references to it (no one use it) 

80 """ 

81 

82 def __init__(self, ttl: int = 60, clean_timeout: float = 3): 

83 """init cache 

84 

85 Args: 

86 ttl (int): time to live (in seconds) for record in cache 

87 clean_timeout (float): interval between cleanups of expired handlers 

88 """ 

89 self.ttl: int = ttl 

90 self._clean_timeout: int = clean_timeout 

91 self.handlers: dict[str, list[HandlersCacheRecord]] = defaultdict(list) 

92 self._lock = threading.RLock() 

93 self._stop_event = threading.Event() 

94 self.cleaner_thread = None 

95 

96 def __del__(self): 

97 self._stop_clean() 

98 

99 def _start_clean(self) -> None: 

100 """start worker that close connections after ttl expired""" 

101 if isinstance(self.cleaner_thread, threading.Thread) and self.cleaner_thread.is_alive(): 

102 return 

103 with self._lock: 

104 self._stop_event.clear() 

105 self.cleaner_thread = threading.Thread(target=self._clean, name="HandlersCache.clean") 

106 self.cleaner_thread.daemon = True 

107 self.cleaner_thread.start() 

108 

109 def _stop_clean(self) -> None: 

110 """stop clean worker""" 

111 self._stop_event.set() 

112 

113 def set(self, handler: DatabaseHandler): 

114 """add (or replace) handler in cache 

115 

116 NOTE: If the handler is not thread-safe, then use a lock when making connection. Otherwise, make connection in 

117 the same thread without using a lock to speed up parallel queries. (They don't need to wait for a connection in 

118 another thread.) 

119 

120 Args: 

121 handler (DatabaseHandler) 

122 """ 

123 cache_thread_safe = getattr(handler, "cache_thread_safe", True) 

124 with self._lock: 

125 cache_single_instance = getattr(handler, "cache_single_instance", False) 

126 if cache_single_instance: 

127 records, _ = self._get_cache_records(handler.name) 

128 if len(records) > 0: 

129 raise ValueError( 

130 "Attempt to add to the HandlersCache second instance of handler with cache_single_instance=True" 

131 ) 

132 try: 

133 # If the handler is defined to be thread safe, set 0 as the last element of the key, otherwise set the thrad ID. 

134 key = ( 

135 handler.name, 

136 ctx.company_id, 

137 0 if cache_thread_safe else threading.get_native_id(), 

138 ) 

139 record = HandlersCacheRecord(handler=handler, expired_at=time.time() + self.ttl) 

140 self.handlers[key].append(record) 

141 except Exception: 

142 logger.warning("Error setting data handler cache record:", exc_info=True) 

143 return 

144 self._start_clean() 

145 record.connect() 

146 

147 def _get_cache_records(self, name: str) -> tuple[list[HandlersCacheRecord] | None, str]: 

148 """get cache records by name 

149 

150 Args: 

151 name (str): handler name 

152 

153 Returns: 

154 tuple[list[HandlersCacheRecord] | None, str]: cache records and key of the handler in cache 

155 """ 

156 # If the handler is not thread safe, the thread ID will be assigned to the last element of the key. 

157 key = (name, ctx.company_id, 0) 

158 if key not in self.handlers: 

159 key = (name, ctx.company_id, threading.get_native_id()) 

160 return self.handlers.get(key, []), key 

161 

162 def get(self, name: str) -> DatabaseHandler | None: 

163 """get handler from cache by name 

164 

165 Args: 

166 name (str): handler name 

167 

168 Returns: 

169 DatabaseHandler 

170 """ 

171 record_to_wait = None 

172 with self._lock: 

173 records, _ = self._get_cache_records(name) 

174 for record in records: 

175 cache_single_instance = getattr(record.handler, "cache_single_instance", False) 

176 cache_usage_lock = getattr(record.handler, "cache_usage_lock", True) 

177 has_references = record.has_references 

178 

179 if cache_single_instance and has_references and cache_usage_lock: 

180 # in this case - wait out of the current lock to prevent global lock of HandlerCache 

181 record_to_wait = record 

182 break 

183 

184 if has_references and cache_usage_lock: 

185 continue 

186 

187 record.expired_at = time.time() + self.ttl 

188 if record.connect_attempt_done.wait(timeout=10) is False: 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true

189 logger.warning(f"Handler's connection attempt has not finished in 10s: {record.handler.name}") 

190 return record.handler 

191 

192 if record_to_wait: 

193 handler = record_to_wait.wait_no_references() 

194 record_to_wait.expired_at = time.time() + self.ttl 

195 return handler 

196 

197 return None 

198 

199 def delete(self, name: str) -> None: 

200 """delete handler from cache 

201 

202 Args: 

203 name (str): handler name 

204 """ 

205 with self._lock: 

206 records, key = self._get_cache_records(name) 

207 if len(records) > 0: 

208 del self.handlers[key] 

209 for record in records: 

210 try: 

211 record.handler.disconnect() 

212 except Exception: 

213 logger.debug("Error disconnecting data handler:", exc_info=True) 

214 

215 if len(self.handlers) == 0: 215 ↛ exitline 215 didn't jump to the function exit

216 self._stop_clean() 

217 

218 def _clean(self) -> None: 

219 """worker that delete from cache handlers that was not in use for ttl""" 

220 while self._stop_event.wait(timeout=self._clean_timeout) is False: 

221 with self._lock: 

222 for key in list(self.handlers.keys()): 

223 active_handlers_list = [] 

224 for record in self.handlers[key]: 

225 if record.expired and record.has_references is False: 

226 try: 

227 record.handler.disconnect() 

228 except Exception: 

229 logger.debug("Error disconnecting data handler:", exc_info=True) 

230 else: 

231 active_handlers_list.append(record) 

232 if len(active_handlers_list) > 0: 

233 self.handlers[key] = active_handlers_list 

234 else: 

235 del self.handlers[key] 

236 

237 if len(self.handlers) == 0: 

238 self._stop_event.set()