Coverage for mindsdb / utilities / ml_task_queue / producer.py: 42%
34 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pickle
3from walrus import Database
4from pandas import DataFrame
6from mindsdb.utilities.context import context as ctx
7from mindsdb.utilities.config import Config
8from mindsdb.utilities.ml_task_queue.utils import RedisKey, to_bytes
9from mindsdb.utilities.ml_task_queue.task import Task
10from mindsdb.utilities.ml_task_queue.base import BaseRedisQueue
11from mindsdb.utilities.ml_task_queue.const import TASKS_STREAM_NAME, ML_TASK_TYPE, ML_TASK_STATUS
12from mindsdb.utilities import log
13from mindsdb.utilities.sentry import sentry_sdk # noqa: F401
15logger = log.getLogger(__name__)
18class MLTaskProducer(BaseRedisQueue):
19 """Interface around the redis for putting tasks to the queue
21 Attributes:
22 db (Redis): database object
23 stream
24 cache
25 pubsub
26 """
28 def __init__(self) -> None:
29 config = Config().get("ml_task_queue", {})
31 self.db = Database(
32 host=config.get("host", "localhost"),
33 port=config.get("port", 6379),
34 db=config.get("db", 0),
35 username=config.get("username"),
36 password=config.get("password"),
37 protocol=3,
38 )
39 self.wait_redis_ping(60)
41 self.stream = self.db.Stream(TASKS_STREAM_NAME)
42 self.cache = self.db.cache()
43 self.pubsub = self.db.pubsub()
45 def apply_async(self, task_type: ML_TASK_TYPE, model_id: int, payload: dict, dataframe: DataFrame = None) -> Task:
46 """Add tasks to the queue
48 Args:
49 task_type (ML_TASK_TYPE): type of the task
50 model_id (int): model identifier
51 payload (dict): lightweight model data that will be added to stream message
52 dataframe (DataFrame): dataframe will be transfered via regular redis storage
54 Returns:
55 Task: object representing the task
56 """
57 try:
58 payload = pickle.dumps(payload, protocol=5)
59 redis_key = RedisKey.new()
60 message = {
61 "task_type": task_type.value,
62 "company_id": "" if ctx.company_id is None else ctx.company_id, # None can not be dumped
63 "model_id": model_id,
64 "payload": payload,
65 "redis_key": redis_key.base,
66 }
68 self.wait_redis_ping()
69 if dataframe is not None:
70 self.cache.set(redis_key.dataframe, to_bytes(dataframe), 180)
71 self.cache.set(redis_key.status, ML_TASK_STATUS.WAITING, 180)
73 self.stream.add(message)
74 return Task(self.db, redis_key)
75 except ConnectionError:
76 logger.exception("Cant send message to redis: connect failed")
77 raise