Coverage for mindsdb / interfaces / jobs / scheduler.py: 0%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import datetime as dt 

2import queue 

3import random 

4import threading 

5import time 

6 

7from mindsdb.interfaces.jobs.jobs_controller import JobsExecutor 

8from mindsdb.interfaces.storage import db 

9from mindsdb.utilities import log 

10from mindsdb.utilities.config import Config 

11from mindsdb.utilities.sentry import sentry_sdk # noqa: F401 

12 

13logger = log.getLogger(__name__) 

14 

15 

16def execute_async(q_in, q_out): 

17 while True: 

18 task = q_in.get() 

19 

20 if task["type"] != "task": 

21 return 

22 

23 record_id = task["record_id"] 

24 history_id = task["history_id"] 

25 

26 executor = JobsExecutor() 

27 try: 

28 executor.execute_task_local(record_id, history_id) 

29 except (KeyboardInterrupt, SystemExit): 

30 q_out.put(True) 

31 raise 

32 

33 except Exception: 

34 db.session.rollback() 

35 

36 q_out.put(True) 

37 

38 

39class Scheduler: 

40 def __init__(self, config=None): 

41 self.config = config 

42 

43 self.q_in = queue.Queue() 

44 self.q_out = queue.Queue() 

45 self.work_thread = threading.Thread( 

46 target=execute_async, args=(self.q_in, self.q_out), name="Scheduler.execute_async" 

47 ) 

48 self.work_thread.start() 

49 

50 def __del__(self): 

51 self.stop_thread() 

52 

53 def stop_thread(self): 

54 self.q_in.put({"type": "exit"}) 

55 

56 def scheduler_monitor(self): 

57 check_interval = self.config.get("jobs", {}).get("check_interval", 30) 

58 

59 while True: 

60 logger.debug("Scheduler check timetable") 

61 try: 

62 self.check_timetable() 

63 except (SystemExit, KeyboardInterrupt): 

64 raise 

65 except Exception: 

66 logger.exception("Error in 'scheduler_monitor'") 

67 

68 # different instances should start in not the same time 

69 

70 time.sleep(check_interval + random.randint(1, 10)) 

71 

72 def check_timetable(self): 

73 executor = JobsExecutor() 

74 

75 exec_method = self.config.get("jobs", {}).get("executor", "local") 

76 

77 for record in executor.get_next_tasks(): 

78 logger.info(f"Job execute: {record.name}({record.id})") 

79 self.execute_task(record.id, exec_method) 

80 

81 db.session.remove() 

82 

83 def execute_task(self, record_id, exec_method): 

84 executor = JobsExecutor() 

85 if exec_method == "local": 

86 history_id = executor.lock_record(record_id) 

87 if history_id is None: 

88 # db.session.remove() 

89 logger.info(f"Unable create history record for {record_id}, is locked?") 

90 return 

91 

92 # run in thread 

93 

94 self.q_in.put( 

95 { 

96 "type": "task", 

97 "record_id": record_id, 

98 "history_id": history_id, 

99 } 

100 ) 

101 

102 while True: 

103 try: 

104 self.q_out.get(timeout=3) 

105 break 

106 except queue.Empty: 

107 # update last date: 

108 history_record = db.JobsHistory.query.get(history_id) 

109 history_record.updated_at = dt.datetime.now() 

110 db.session.commit() 

111 

112 else: 

113 # TODO add microservice mode 

114 raise NotImplementedError() 

115 

116 def start(self): 

117 config = Config() 

118 db.init() 

119 self.config = config 

120 

121 logger.info("Scheduler starts") 

122 

123 try: 

124 self.scheduler_monitor() 

125 except (KeyboardInterrupt, SystemExit): 

126 self.stop_thread() 

127 pass 

128 

129 

130def start(verbose=False): 

131 scheduler = Scheduler() 

132 

133 scheduler.start() 

134 

135 

136if __name__ == "__main__": 

137 start()