Coverage for mindsdb / interfaces / tasks / task_thread.py: 0%

43 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import traceback 

2import threading 

3from mindsdb.utilities.context import context as ctx 

4from mindsdb.interfaces.storage import db 

5from mindsdb.utilities import log 

6 

7from mindsdb.interfaces.triggers.trigger_task import TriggerTask 

8from mindsdb.interfaces.chatbot.chatbot_task import ChatBotTask 

9from mindsdb.interfaces.query_context.query_task import QueryTask 

10 

11logger = log.getLogger(__name__) 

12 

13 

14class TaskThread(threading.Thread): 

15 def __init__(self, task_id): 

16 threading.Thread.__init__(self) 

17 self.task_id = task_id 

18 self._stop_event = threading.Event() 

19 self.object_type = None 

20 self.object_id = None 

21 

22 def run(self): 

23 # create context and session 

24 

25 task_record = db.Tasks.query.get(self.task_id) 

26 

27 ctx.set_default() 

28 ctx.company_id = task_record.company_id 

29 if task_record.user_class is not None: 

30 ctx.user_class = task_record.user_class 

31 ctx.task_id = task_record.id 

32 

33 self.object_type = task_record.object_type 

34 self.object_id = task_record.object_id 

35 

36 logger.info(f"Task starting: {self.object_type}.{self.object_id}") 

37 try: 

38 if self.object_type == "trigger": 

39 trigger = TriggerTask(self.task_id, self.object_id) 

40 trigger.run(self._stop_event) 

41 

42 elif self.object_type == "chatbot": 

43 bot = ChatBotTask(self.task_id, self.object_id) 

44 bot.run(self._stop_event) 

45 

46 elif self.object_type == "query": 

47 query = QueryTask(self.task_id, self.object_id) 

48 query.run(self._stop_event) 

49 

50 except Exception: 

51 logger.exception("Error during task processing:") 

52 task_record.last_error = traceback.format_exc() 

53 

54 db.session.commit() 

55 

56 def stop(self): 

57 logger.info(f"Task stopping: {self.object_type}.{self.object_id}") 

58 

59 self._stop_event.set()