Coverage for mindsdb / interfaces / tasks / task_monitor.py: 0%
97 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import datetime as dt
2import os
3import socket
4import time
5from threading import Event
7import sqlalchemy as sa
9from mindsdb.interfaces.storage import db
10from mindsdb.utilities import log
11from mindsdb.utilities.config import Config
13from .task_thread import TaskThread
15logger = log.getLogger(__name__)
18class TaskMonitor:
19 MONITOR_INTERVAL_SECONDS = 2
20 LOCK_EXPIRED_SECONDS = MONITOR_INTERVAL_SECONDS * 30
22 def __init__(self):
23 self._active_tasks = {}
25 def start(self, stop_event: Event = None):
26 config = Config()
27 db.init()
28 self.config = config
30 while True:
31 try:
32 self.check_tasks()
34 db.session.rollback() # disable cache
35 time.sleep(self.MONITOR_INTERVAL_SECONDS)
37 except (SystemExit, KeyboardInterrupt):
38 self.stop_all_tasks()
39 return
41 except Exception:
42 logger.exception("Error in TaskMonitor.start")
43 db.session.rollback()
45 if stop_event is not None and stop_event.is_set():
46 return
48 def stop_all_tasks(self):
49 active_tasks = list(self._active_tasks.keys())
50 for task_id in active_tasks:
51 self.stop_task(task_id)
53 def check_tasks(self):
54 allowed_tasks = set()
56 for task in db.session.query(db.Tasks).filter(db.Tasks.active == True): # noqa
57 allowed_tasks.add(task.id)
59 # start new tasks
60 if task.id not in self._active_tasks:
61 self.start_task(task)
63 # Check active tasks
64 active_tasks = list(self._active_tasks.items())
65 for task_id, task in active_tasks:
66 if task_id not in allowed_tasks:
67 # old task
68 self.stop_task(task_id)
70 elif not task.is_alive():
71 # dead task
72 self.stop_task(task_id)
74 else:
75 # need to be reloaded ?
76 record = db.Tasks.query.get(task_id)
77 if record.reload:
78 record.reload = False
79 self.stop_task(task_id)
80 else:
81 # set alive time of running tasks
82 self._set_alive(task_id)
84 def _lock_task(self, task):
85 run_by = f"{socket.gethostname()} {os.getpid()}"
86 db_date = db.session.query(sa.func.current_timestamp()).first()[0]
87 if task.run_by == run_by:
88 # already locked
89 task.alive_time = db_date
91 elif task.alive_time is None:
92 # not locked yet
93 task.run_by = run_by
94 task.alive_time = db_date
96 elif db_date - task.alive_time > dt.timedelta(seconds=self.LOCK_EXPIRED_SECONDS):
97 # lock expired
98 task.run_by = run_by
99 task.alive_time = db_date
101 else:
102 return False
104 db.session.commit()
105 return True
107 def _set_alive(self, task_id):
108 db_date = db.session.query(sa.func.current_timestamp()).first()[0]
109 task = db.Tasks.query.get(task_id)
110 task.alive_time = db_date
111 db.session.commit()
113 def _unlock_task(self, task_id):
114 task = db.Tasks.query.get(task_id)
115 if task is not None:
116 task.alive_time = None
117 db.session.commit()
119 def start_task(self, task):
120 if not self._lock_task(task):
121 # can't lock, skip
122 return
124 thread = TaskThread(task.id)
126 thread.start()
127 self._active_tasks[task.id] = thread
129 def stop_task(self, task_id: int):
130 thread = self._active_tasks[task_id]
131 thread.stop()
132 thread.join(1)
134 if thread.is_alive():
135 # don't delete task, wait next circle
136 return
138 del self._active_tasks[task_id]
139 self._unlock_task(task_id)
142def start(verbose=False):
143 monitor = TaskMonitor()
144 monitor.start()
147if __name__ == "__main__":
148 start()