Coverage for mindsdb / interfaces / tasks / task_thread.py: 0%
43 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import traceback
2import threading
3from mindsdb.utilities.context import context as ctx
4from mindsdb.interfaces.storage import db
5from mindsdb.utilities import log
7from mindsdb.interfaces.triggers.trigger_task import TriggerTask
8from mindsdb.interfaces.chatbot.chatbot_task import ChatBotTask
9from mindsdb.interfaces.query_context.query_task import QueryTask
11logger = log.getLogger(__name__)
14class TaskThread(threading.Thread):
15 def __init__(self, task_id):
16 threading.Thread.__init__(self)
17 self.task_id = task_id
18 self._stop_event = threading.Event()
19 self.object_type = None
20 self.object_id = None
22 def run(self):
23 # create context and session
25 task_record = db.Tasks.query.get(self.task_id)
27 ctx.set_default()
28 ctx.company_id = task_record.company_id
29 if task_record.user_class is not None:
30 ctx.user_class = task_record.user_class
31 ctx.task_id = task_record.id
33 self.object_type = task_record.object_type
34 self.object_id = task_record.object_id
36 logger.info(f"Task starting: {self.object_type}.{self.object_id}")
37 try:
38 if self.object_type == "trigger":
39 trigger = TriggerTask(self.task_id, self.object_id)
40 trigger.run(self._stop_event)
42 elif self.object_type == "chatbot":
43 bot = ChatBotTask(self.task_id, self.object_id)
44 bot.run(self._stop_event)
46 elif self.object_type == "query":
47 query = QueryTask(self.task_id, self.object_id)
48 query.run(self._stop_event)
50 except Exception:
51 logger.exception("Error during task processing:")
52 task_record.last_error = traceback.format_exc()
54 db.session.commit()
56 def stop(self):
57 logger.info(f"Task stopping: {self.object_type}.{self.object_id}")
59 self._stop_event.set()