Coverage for mindsdb / utilities / ml_task_queue / utils.py: 44%
60 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import time
2import pickle
3import socket
4import threading
6from walrus import Database
7from redis.exceptions import ConnectionError as RedisConnectionError
9from mindsdb.utilities.context import context as ctx
10from mindsdb.utilities.ml_task_queue.const import ML_TASK_STATUS
11from mindsdb.utilities.sentry import sentry_sdk # noqa: F401
14def to_bytes(obj: object) -> bytes:
15 """ dump object into bytes
17 Args:
18 obj (object): object to convert
20 Returns:
21 bytes
22 """
23 return pickle.dumps(obj, protocol=5)
26def from_bytes(b: bytes) -> object:
27 """ load object from bytes
29 Args:
30 b (bytes):
32 Returns:
33 object
34 """
35 return pickle.loads(b)
38def wait_redis_ping(db: Database, timeout: int = 30):
39 """ Wait when redis.ping return True
41 Args:
42 db (Database): redis db object
43 timeout (int): seconds to wait for success ping
45 Raises:
46 RedisConnectionError: if `ping` did not return `True` within `timeout` seconds
47 """
48 end_time = time.time() + timeout
49 while time.time() <= end_time:
50 try:
51 if db.ping() is True:
52 break
53 except RedisConnectionError:
54 pass
55 time.sleep(2)
56 else:
57 raise RedisConnectionError
60class RedisKey:
61 """ The class responsible for unique task keys in redis
63 Attributes:
64 _base_key (bytes): prefix for keys
65 """
67 @staticmethod
68 def new() -> 'RedisKey':
69 timestamp = str(time.time()).replace('.', '')
70 return RedisKey(f"{timestamp}-{ctx.company_id}-{socket.gethostname()}".encode())
72 def __init__(self, base_key: bytes) -> None:
73 self._base_key = base_key
75 @property
76 def base(self) -> bytes:
77 return self._base_key
79 @property
80 def status(self) -> str:
81 return (self._base_key + b'-status').decode()
83 @property
84 def dataframe(self) -> str:
85 return (self._base_key + b'-dataframe').decode()
87 @property
88 def exception(self) -> str:
89 return (self._base_key + b'-exception').decode()
92class StatusNotifier(threading.Thread):
93 """ Worker that updates task status in redis with fixed frequency
94 """
96 def __init__(self, redis_key: RedisKey, ml_task_status: ML_TASK_STATUS, db, cache) -> None:
97 threading.Thread.__init__(self)
98 self.redis_key = redis_key
99 self.ml_task_status = ml_task_status
100 self.db = db
101 self.cache = cache
102 self._stop_event = threading.Event()
104 def set_status(self, ml_task_status: ML_TASK_STATUS):
105 """ change status
107 Args:
108 ml_task_status (ML_TASK_STATUS): new status
109 """
110 self.ml_task_status = ml_task_status
112 def stop(self) -> None:
113 """ stop status updating
114 """
115 self._stop_event.set()
117 def run(self):
118 """ start update status with fixed frequency
119 """
120 while not self._stop_event.is_set():
121 wait_redis_ping(self.db)
122 self.db.publish(self.redis_key.status, self.ml_task_status.value)
123 self.cache.set(self.redis_key.status, self.ml_task_status.value, 180)
124 time.sleep(5)