Coverage for mindsdb / utilities / ml_task_queue / producer.py: 42%

34 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pickle 

2 

3from walrus import Database 

4from pandas import DataFrame 

5 

6from mindsdb.utilities.context import context as ctx 

7from mindsdb.utilities.config import Config 

8from mindsdb.utilities.ml_task_queue.utils import RedisKey, to_bytes 

9from mindsdb.utilities.ml_task_queue.task import Task 

10from mindsdb.utilities.ml_task_queue.base import BaseRedisQueue 

11from mindsdb.utilities.ml_task_queue.const import TASKS_STREAM_NAME, ML_TASK_TYPE, ML_TASK_STATUS 

12from mindsdb.utilities import log 

13from mindsdb.utilities.sentry import sentry_sdk # noqa: F401 

14 

15logger = log.getLogger(__name__) 

16 

17 

18class MLTaskProducer(BaseRedisQueue): 

19 """Interface around the redis for putting tasks to the queue 

20 

21 Attributes: 

22 db (Redis): database object 

23 stream 

24 cache 

25 pubsub 

26 """ 

27 

28 def __init__(self) -> None: 

29 config = Config().get("ml_task_queue", {}) 

30 

31 self.db = Database( 

32 host=config.get("host", "localhost"), 

33 port=config.get("port", 6379), 

34 db=config.get("db", 0), 

35 username=config.get("username"), 

36 password=config.get("password"), 

37 protocol=3, 

38 ) 

39 self.wait_redis_ping(60) 

40 

41 self.stream = self.db.Stream(TASKS_STREAM_NAME) 

42 self.cache = self.db.cache() 

43 self.pubsub = self.db.pubsub() 

44 

45 def apply_async(self, task_type: ML_TASK_TYPE, model_id: int, payload: dict, dataframe: DataFrame = None) -> Task: 

46 """Add tasks to the queue 

47 

48 Args: 

49 task_type (ML_TASK_TYPE): type of the task 

50 model_id (int): model identifier 

51 payload (dict): lightweight model data that will be added to stream message 

52 dataframe (DataFrame): dataframe will be transfered via regular redis storage 

53 

54 Returns: 

55 Task: object representing the task 

56 """ 

57 try: 

58 payload = pickle.dumps(payload, protocol=5) 

59 redis_key = RedisKey.new() 

60 message = { 

61 "task_type": task_type.value, 

62 "company_id": "" if ctx.company_id is None else ctx.company_id, # None can not be dumped 

63 "model_id": model_id, 

64 "payload": payload, 

65 "redis_key": redis_key.base, 

66 } 

67 

68 self.wait_redis_ping() 

69 if dataframe is not None: 

70 self.cache.set(redis_key.dataframe, to_bytes(dataframe), 180) 

71 self.cache.set(redis_key.status, ML_TASK_STATUS.WAITING, 180) 

72 

73 self.stream.add(message) 

74 return Task(self.db, redis_key) 

75 except ConnectionError: 

76 logger.exception("Cant send message to redis: connect failed") 

77 raise