Coverage for mindsdb / integrations / libs / ml_handler_process / handlers_cacher.py: 50%
16 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import time
2from collections import UserDict
5class HandlersCache(UserDict):
6 def __init__(self, max_size: int = 5) -> None:
7 self._max_size = max_size
8 super().__init__()
10 def __setitem__(self, key, value) -> None:
11 if len(self.data) > self._max_size:
12 sorted_elements = sorted(
13 self.data.items(),
14 key=lambda x: x[1]['last_usage_at']
15 )
16 del self.data[sorted_elements[0][0]]
17 self.data[key] = {
18 'last_usage_at': time.time(),
19 'handler': value
20 }
22 def __getitem__(self, key: int) -> object:
23 el = super().__getitem__(key)
24 el['last_usage_at'] = time.time()
25 return el['handler']
28handlers_cacher = HandlersCache()