Coverage for mindsdb / interfaces / jobs / scheduler.py: 0%
88 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import datetime as dt
2import queue
3import random
4import threading
5import time
7from mindsdb.interfaces.jobs.jobs_controller import JobsExecutor
8from mindsdb.interfaces.storage import db
9from mindsdb.utilities import log
10from mindsdb.utilities.config import Config
11from mindsdb.utilities.sentry import sentry_sdk # noqa: F401
13logger = log.getLogger(__name__)
16def execute_async(q_in, q_out):
17 while True:
18 task = q_in.get()
20 if task["type"] != "task":
21 return
23 record_id = task["record_id"]
24 history_id = task["history_id"]
26 executor = JobsExecutor()
27 try:
28 executor.execute_task_local(record_id, history_id)
29 except (KeyboardInterrupt, SystemExit):
30 q_out.put(True)
31 raise
33 except Exception:
34 db.session.rollback()
36 q_out.put(True)
39class Scheduler:
40 def __init__(self, config=None):
41 self.config = config
43 self.q_in = queue.Queue()
44 self.q_out = queue.Queue()
45 self.work_thread = threading.Thread(
46 target=execute_async, args=(self.q_in, self.q_out), name="Scheduler.execute_async"
47 )
48 self.work_thread.start()
50 def __del__(self):
51 self.stop_thread()
53 def stop_thread(self):
54 self.q_in.put({"type": "exit"})
56 def scheduler_monitor(self):
57 check_interval = self.config.get("jobs", {}).get("check_interval", 30)
59 while True:
60 logger.debug("Scheduler check timetable")
61 try:
62 self.check_timetable()
63 except (SystemExit, KeyboardInterrupt):
64 raise
65 except Exception:
66 logger.exception("Error in 'scheduler_monitor'")
68 # different instances should start in not the same time
70 time.sleep(check_interval + random.randint(1, 10))
72 def check_timetable(self):
73 executor = JobsExecutor()
75 exec_method = self.config.get("jobs", {}).get("executor", "local")
77 for record in executor.get_next_tasks():
78 logger.info(f"Job execute: {record.name}({record.id})")
79 self.execute_task(record.id, exec_method)
81 db.session.remove()
83 def execute_task(self, record_id, exec_method):
84 executor = JobsExecutor()
85 if exec_method == "local":
86 history_id = executor.lock_record(record_id)
87 if history_id is None:
88 # db.session.remove()
89 logger.info(f"Unable create history record for {record_id}, is locked?")
90 return
92 # run in thread
94 self.q_in.put(
95 {
96 "type": "task",
97 "record_id": record_id,
98 "history_id": history_id,
99 }
100 )
102 while True:
103 try:
104 self.q_out.get(timeout=3)
105 break
106 except queue.Empty:
107 # update last date:
108 history_record = db.JobsHistory.query.get(history_id)
109 history_record.updated_at = dt.datetime.now()
110 db.session.commit()
112 else:
113 # TODO add microservice mode
114 raise NotImplementedError()
116 def start(self):
117 config = Config()
118 db.init()
119 self.config = config
121 logger.info("Scheduler starts")
123 try:
124 self.scheduler_monitor()
125 except (KeyboardInterrupt, SystemExit):
126 self.stop_thread()
127 pass
130def start(verbose=False):
131 scheduler = Scheduler()
133 scheduler.start()
136if __name__ == "__main__":
137 start()