Coverage for mindsdb / interfaces / tasks / task_monitor.py: 0%

97 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import datetime as dt 

2import os 

3import socket 

4import time 

5from threading import Event 

6 

7import sqlalchemy as sa 

8 

9from mindsdb.interfaces.storage import db 

10from mindsdb.utilities import log 

11from mindsdb.utilities.config import Config 

12 

13from .task_thread import TaskThread 

14 

15logger = log.getLogger(__name__) 

16 

17 

18class TaskMonitor: 

19 MONITOR_INTERVAL_SECONDS = 2 

20 LOCK_EXPIRED_SECONDS = MONITOR_INTERVAL_SECONDS * 30 

21 

22 def __init__(self): 

23 self._active_tasks = {} 

24 

25 def start(self, stop_event: Event = None): 

26 config = Config() 

27 db.init() 

28 self.config = config 

29 

30 while True: 

31 try: 

32 self.check_tasks() 

33 

34 db.session.rollback() # disable cache 

35 time.sleep(self.MONITOR_INTERVAL_SECONDS) 

36 

37 except (SystemExit, KeyboardInterrupt): 

38 self.stop_all_tasks() 

39 return 

40 

41 except Exception: 

42 logger.exception("Error in TaskMonitor.start") 

43 db.session.rollback() 

44 

45 if stop_event is not None and stop_event.is_set(): 

46 return 

47 

48 def stop_all_tasks(self): 

49 active_tasks = list(self._active_tasks.keys()) 

50 for task_id in active_tasks: 

51 self.stop_task(task_id) 

52 

53 def check_tasks(self): 

54 allowed_tasks = set() 

55 

56 for task in db.session.query(db.Tasks).filter(db.Tasks.active == True): # noqa 

57 allowed_tasks.add(task.id) 

58 

59 # start new tasks 

60 if task.id not in self._active_tasks: 

61 self.start_task(task) 

62 

63 # Check active tasks 

64 active_tasks = list(self._active_tasks.items()) 

65 for task_id, task in active_tasks: 

66 if task_id not in allowed_tasks: 

67 # old task 

68 self.stop_task(task_id) 

69 

70 elif not task.is_alive(): 

71 # dead task 

72 self.stop_task(task_id) 

73 

74 else: 

75 # need to be reloaded ? 

76 record = db.Tasks.query.get(task_id) 

77 if record.reload: 

78 record.reload = False 

79 self.stop_task(task_id) 

80 else: 

81 # set alive time of running tasks 

82 self._set_alive(task_id) 

83 

84 def _lock_task(self, task): 

85 run_by = f"{socket.gethostname()} {os.getpid()}" 

86 db_date = db.session.query(sa.func.current_timestamp()).first()[0] 

87 if task.run_by == run_by: 

88 # already locked 

89 task.alive_time = db_date 

90 

91 elif task.alive_time is None: 

92 # not locked yet 

93 task.run_by = run_by 

94 task.alive_time = db_date 

95 

96 elif db_date - task.alive_time > dt.timedelta(seconds=self.LOCK_EXPIRED_SECONDS): 

97 # lock expired 

98 task.run_by = run_by 

99 task.alive_time = db_date 

100 

101 else: 

102 return False 

103 

104 db.session.commit() 

105 return True 

106 

107 def _set_alive(self, task_id): 

108 db_date = db.session.query(sa.func.current_timestamp()).first()[0] 

109 task = db.Tasks.query.get(task_id) 

110 task.alive_time = db_date 

111 db.session.commit() 

112 

113 def _unlock_task(self, task_id): 

114 task = db.Tasks.query.get(task_id) 

115 if task is not None: 

116 task.alive_time = None 

117 db.session.commit() 

118 

119 def start_task(self, task): 

120 if not self._lock_task(task): 

121 # can't lock, skip 

122 return 

123 

124 thread = TaskThread(task.id) 

125 

126 thread.start() 

127 self._active_tasks[task.id] = thread 

128 

129 def stop_task(self, task_id: int): 

130 thread = self._active_tasks[task_id] 

131 thread.stop() 

132 thread.join(1) 

133 

134 if thread.is_alive(): 

135 # don't delete task, wait next circle 

136 return 

137 

138 del self._active_tasks[task_id] 

139 self._unlock_task(task_id) 

140 

141 

142def start(verbose=False): 

143 monitor = TaskMonitor() 

144 monitor.start() 

145 

146 

147if __name__ == "__main__": 

148 start()