Coverage for mindsdb / utilities / ml_task_queue / utils.py: 44%

60 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import time 

2import pickle 

3import socket 

4import threading 

5 

6from walrus import Database 

7from redis.exceptions import ConnectionError as RedisConnectionError 

8 

9from mindsdb.utilities.context import context as ctx 

10from mindsdb.utilities.ml_task_queue.const import ML_TASK_STATUS 

11from mindsdb.utilities.sentry import sentry_sdk # noqa: F401 

12 

13 

14def to_bytes(obj: object) -> bytes: 

15 """ dump object into bytes 

16 

17 Args: 

18 obj (object): object to convert 

19 

20 Returns: 

21 bytes 

22 """ 

23 return pickle.dumps(obj, protocol=5) 

24 

25 

26def from_bytes(b: bytes) -> object: 

27 """ load object from bytes 

28 

29 Args: 

30 b (bytes): 

31 

32 Returns: 

33 object 

34 """ 

35 return pickle.loads(b) 

36 

37 

38def wait_redis_ping(db: Database, timeout: int = 30): 

39 """ Wait when redis.ping return True 

40 

41 Args: 

42 db (Database): redis db object 

43 timeout (int): seconds to wait for success ping 

44 

45 Raises: 

46 RedisConnectionError: if `ping` did not return `True` within `timeout` seconds 

47 """ 

48 end_time = time.time() + timeout 

49 while time.time() <= end_time: 

50 try: 

51 if db.ping() is True: 

52 break 

53 except RedisConnectionError: 

54 pass 

55 time.sleep(2) 

56 else: 

57 raise RedisConnectionError 

58 

59 

60class RedisKey: 

61 """ The class responsible for unique task keys in redis 

62 

63 Attributes: 

64 _base_key (bytes): prefix for keys 

65 """ 

66 

67 @staticmethod 

68 def new() -> 'RedisKey': 

69 timestamp = str(time.time()).replace('.', '') 

70 return RedisKey(f"{timestamp}-{ctx.company_id}-{socket.gethostname()}".encode()) 

71 

72 def __init__(self, base_key: bytes) -> None: 

73 self._base_key = base_key 

74 

75 @property 

76 def base(self) -> bytes: 

77 return self._base_key 

78 

79 @property 

80 def status(self) -> str: 

81 return (self._base_key + b'-status').decode() 

82 

83 @property 

84 def dataframe(self) -> str: 

85 return (self._base_key + b'-dataframe').decode() 

86 

87 @property 

88 def exception(self) -> str: 

89 return (self._base_key + b'-exception').decode() 

90 

91 

92class StatusNotifier(threading.Thread): 

93 """ Worker that updates task status in redis with fixed frequency 

94 """ 

95 

96 def __init__(self, redis_key: RedisKey, ml_task_status: ML_TASK_STATUS, db, cache) -> None: 

97 threading.Thread.__init__(self) 

98 self.redis_key = redis_key 

99 self.ml_task_status = ml_task_status 

100 self.db = db 

101 self.cache = cache 

102 self._stop_event = threading.Event() 

103 

104 def set_status(self, ml_task_status: ML_TASK_STATUS): 

105 """ change status 

106 

107 Args: 

108 ml_task_status (ML_TASK_STATUS): new status 

109 """ 

110 self.ml_task_status = ml_task_status 

111 

112 def stop(self) -> None: 

113 """ stop status updating 

114 """ 

115 self._stop_event.set() 

116 

117 def run(self): 

118 """ start update status with fixed frequency 

119 """ 

120 while not self._stop_event.is_set(): 

121 wait_redis_ping(self.db) 

122 self.db.publish(self.redis_key.status, self.ml_task_status.value) 

123 self.cache.set(self.redis_key.status, self.ml_task_status.value, 180) 

124 time.sleep(5)