Coverage for mindsdb / utilities / ml_task_queue / task.py: 15%

50 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from collections.abc import Callable 

2 

3import redis 

4from pandas import DataFrame 

5 

6from mindsdb.utilities.ml_task_queue.utils import RedisKey, from_bytes 

7from mindsdb.utilities.ml_task_queue.const import ML_TASK_STATUS 

8 

9 

10class Task: 

11 """Abstraction for ML task. Should have interface similat to concurrent.futures.Future 

12 

13 Attributes: 

14 db (Redis): database object 

15 redis_key (RedisKey): redis keys associated with task 

16 dataframe (DataFrame): task result 

17 exception (Exception): task exeuton runtime exception 

18 _timeout (int): max time without status updating 

19 """ 

20 

21 def __init__(self, connection: redis.Redis, redis_key: RedisKey) -> None: 

22 self.db = connection 

23 self.redis_key = redis_key 

24 self.dataframe = None 

25 self.exception = None 

26 self._timeout = 60 

27 

28 def subscribe(self) -> ML_TASK_STATUS: 

29 """return tasks status untill it is not done or failed""" 

30 pubsub = self.db.pubsub() 

31 cache = self.db.cache() 

32 pubsub.subscribe(self.redis_key.status) 

33 while msg := pubsub.get_message(timeout=self._timeout): 

34 if msg["type"] not in pubsub.PUBLISH_MESSAGE_TYPES: 

35 continue 

36 ml_task_status = ML_TASK_STATUS(msg["data"]) 

37 if ml_task_status == ML_TASK_STATUS.COMPLETE: 

38 dataframe_bytes = cache.get(self.redis_key.dataframe) 

39 if dataframe_bytes is not None: 

40 self.dataframe = from_bytes(dataframe_bytes) 

41 cache.delete(self.redis_key.dataframe) 

42 elif ml_task_status == ML_TASK_STATUS.ERROR: 

43 exception_bytes = cache.get(self.redis_key.exception) 

44 if exception_bytes is not None: 

45 self.exception = from_bytes(exception_bytes) 

46 yield ml_task_status 

47 else: 

48 # there is no mesasges, timeout 

49 ml_task_status = ML_TASK_STATUS.TIMEOUT 

50 yield ml_task_status 

51 

52 def wait(self, status: ML_TASK_STATUS = ML_TASK_STATUS.COMPLETE) -> None: 

53 """block threasd untill task is not done or failed""" 

54 for status in self.subscribe(): 

55 if status in (ML_TASK_STATUS.WAITING, ML_TASK_STATUS.PROCESSING): 

56 continue 

57 if status == ML_TASK_STATUS.ERROR: 

58 if self.exception is not None: 

59 raise self.exception 

60 else: 

61 raise Exception("Unknown error during ML task execution") 

62 if status == ML_TASK_STATUS.TIMEOUT: 

63 raise Exception(f"Can't get answer in {self._timeout} seconds") 

64 if status == ML_TASK_STATUS.COMPLETE: 

65 return 

66 raise KeyError("Unknown task status") 

67 

68 def result(self) -> DataFrame: 

69 """wait task is done and return result 

70 

71 Returns: 

72 DataFrame: task result 

73 """ 

74 self.wait() 

75 return self.dataframe 

76 

77 def add_done_callback(self, fn: Callable) -> None: 

78 """need for compatability with concurrent.futures.Future interface""" 

79 pass