Coverage for mindsdb / utilities / ml_task_queue / task.py: 15%
50 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from collections.abc import Callable
3import redis
4from pandas import DataFrame
6from mindsdb.utilities.ml_task_queue.utils import RedisKey, from_bytes
7from mindsdb.utilities.ml_task_queue.const import ML_TASK_STATUS
10class Task:
11 """Abstraction for ML task. Should have interface similat to concurrent.futures.Future
13 Attributes:
14 db (Redis): database object
15 redis_key (RedisKey): redis keys associated with task
16 dataframe (DataFrame): task result
17 exception (Exception): task exeuton runtime exception
18 _timeout (int): max time without status updating
19 """
21 def __init__(self, connection: redis.Redis, redis_key: RedisKey) -> None:
22 self.db = connection
23 self.redis_key = redis_key
24 self.dataframe = None
25 self.exception = None
26 self._timeout = 60
28 def subscribe(self) -> ML_TASK_STATUS:
29 """return tasks status untill it is not done or failed"""
30 pubsub = self.db.pubsub()
31 cache = self.db.cache()
32 pubsub.subscribe(self.redis_key.status)
33 while msg := pubsub.get_message(timeout=self._timeout):
34 if msg["type"] not in pubsub.PUBLISH_MESSAGE_TYPES:
35 continue
36 ml_task_status = ML_TASK_STATUS(msg["data"])
37 if ml_task_status == ML_TASK_STATUS.COMPLETE:
38 dataframe_bytes = cache.get(self.redis_key.dataframe)
39 if dataframe_bytes is not None:
40 self.dataframe = from_bytes(dataframe_bytes)
41 cache.delete(self.redis_key.dataframe)
42 elif ml_task_status == ML_TASK_STATUS.ERROR:
43 exception_bytes = cache.get(self.redis_key.exception)
44 if exception_bytes is not None:
45 self.exception = from_bytes(exception_bytes)
46 yield ml_task_status
47 else:
48 # there is no mesasges, timeout
49 ml_task_status = ML_TASK_STATUS.TIMEOUT
50 yield ml_task_status
52 def wait(self, status: ML_TASK_STATUS = ML_TASK_STATUS.COMPLETE) -> None:
53 """block threasd untill task is not done or failed"""
54 for status in self.subscribe():
55 if status in (ML_TASK_STATUS.WAITING, ML_TASK_STATUS.PROCESSING):
56 continue
57 if status == ML_TASK_STATUS.ERROR:
58 if self.exception is not None:
59 raise self.exception
60 else:
61 raise Exception("Unknown error during ML task execution")
62 if status == ML_TASK_STATUS.TIMEOUT:
63 raise Exception(f"Can't get answer in {self._timeout} seconds")
64 if status == ML_TASK_STATUS.COMPLETE:
65 return
66 raise KeyError("Unknown task status")
68 def result(self) -> DataFrame:
69 """wait task is done and return result
71 Returns:
72 DataFrame: task result
73 """
74 self.wait()
75 return self.dataframe
77 def add_done_callback(self, fn: Callable) -> None:
78 """need for compatability with concurrent.futures.Future interface"""
79 pass