Coverage for mindsdb / interfaces / jobs / jobs_controller.py: 38%
268 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import re
2import datetime as dt
3from dateutil.relativedelta import relativedelta
4from typing import List
6import sqlalchemy as sa
8from mindsdb_sql_parser import parse_sql, ParsingException
9from mindsdb_sql_parser.ast.mindsdb import CreateJob
10from mindsdb_sql_parser.ast import Select, Star, Identifier, BinaryOperation, Constant
12from mindsdb.utilities.config import config
13from mindsdb.utilities.context import context as ctx
14from mindsdb.utilities.exception import EntityNotExistsError, EntityExistsError
15from mindsdb.interfaces.storage import db
16from mindsdb.interfaces.database.projects import ProjectController
17from mindsdb.interfaces.query_context.context_controller import query_context_controller
18from mindsdb.interfaces.database.log import LogDBController
20from mindsdb.utilities import log
22logger = log.getLogger(__name__)
24default_project = config.get("default_project")
27def split_sql(sql):
28 # split sql by ';' ignoring delimiter in quotes
29 pattern = re.compile(r"""((?:[^;"']|"[^"]*"|'[^']*')+)""")
30 return pattern.split(sql)[1::2]
33def calc_next_date(schedule_str, base_date: dt.datetime):
34 schedule_str = schedule_str.lower().strip()
36 repeat_prefix = "every "
37 if schedule_str.startswith(repeat_prefix): 37 ↛ 41line 37 didn't jump to line 41 because the condition on line 37 was always true
38 repeat_str = schedule_str[len(repeat_prefix) :]
39 else:
40 # TODO cron format
41 raise NotImplementedError(f"Schedule: {schedule_str}")
43 items = repeat_str.split()
45 if len(items) == 1: 45 ↛ 48line 45 didn't jump to line 48 because the condition on line 45 was always true
46 value = "1"
47 period = items[0]
48 elif len(items) == 2:
49 value, period = items
50 else:
51 raise Exception(f"Can't parse repeat string: {repeat_str}")
53 if not value.isdigit(): 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true
54 raise Exception(f"Number expected: {value}")
55 value = int(value)
56 if period in ("minute", "minutes", "min"): 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true
57 delta = dt.timedelta(minutes=value)
58 elif period in ("hour", "hours"): 58 ↛ 60line 58 didn't jump to line 60 because the condition on line 58 was always true
59 delta = dt.timedelta(hours=value)
60 elif period in ("day", "days"):
61 delta = dt.timedelta(days=value)
62 elif period in ("week", "weeks"):
63 delta = dt.timedelta(days=value * 7) # 1 week = 7 days
64 elif period in ("month", "months"):
65 delta = relativedelta(months=value)
66 else:
67 raise Exception(f"Unknown period: {period}")
69 # period limitation disabled for now
70 # config = Config()
71 # is_cloud = config.get('cloud', False)
72 # if is_cloud and ctx.user_class == 0:
73 # if delta < dt.timedelta(days=1):
74 # raise Exception("Minimal allowed period can't be less than one day")
76 next_date = base_date + delta
78 return next_date
81def parse_job_date(date_str: str) -> dt.datetime:
82 """
83 Convert string used as job data to datetime object
84 :param date_str:
85 :return:
86 """
88 if date_str.upper() == "NOW": 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 return dt.datetime.now()
91 date_formats = ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d"]
92 date = None
93 for date_format in date_formats:
94 try:
95 date = dt.datetime.strptime(date_str, date_format)
96 except ValueError:
97 pass
98 if date is None: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 raise ValueError(f"Can't parse date: {date_str}")
100 return date
103class JobsController:
104 def add(
105 self,
106 name: str,
107 project_name: str,
108 query: str,
109 start_at: dt.datetime = None,
110 end_at: dt.datetime = None,
111 if_query: str = None,
112 schedule_str: str = None,
113 ) -> str:
114 """
115 Create a new job
117 More info: https://docs.mindsdb.com/mindsdb_sql/sql/create/jobs#create-job
119 :param name: name of the job
120 :param project_name: project name
121 :param query: sql query for job to execute, it could be several queries seperated by ';'
122 :param start_at: datetime of first execution of the job, optional
123 :param end_at: datetime after which job should not be executed anymore
124 :param if_query: condition for job,
125 if this query (or last from list of queries separated by ';') returns data and no error in queries:
126 job will not be executed
127 :param schedule_str: description how to repeat job
128 at the moment supports: 'every <number> <dimension>' or 'every <dimension>'
129 :return: name of created job
130 """
132 project_controller = ProjectController()
133 project = project_controller.get(name=project_name)
135 # check if exists
136 if self.get(name, project_name) is not None: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true
137 raise EntityExistsError("Job already exists", name)
139 if start_at is None: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true
140 start_at = dt.datetime.now()
142 if end_at is not None and end_at < start_at: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true
143 raise Exception(f"Wrong end date {start_at} > {end_at}")
145 # check sql = try to parse it
146 for sql in split_sql(query):
147 try:
148 # replace template variables with null
149 sql = re.sub(r"\{\{[\w\d]+}}", "", sql)
151 parse_sql(sql)
152 except ParsingException as e:
153 raise ParsingException(f"Unable to parse: {sql}: {e}") from e
155 if if_query is not None: 155 ↛ 166line 155 didn't jump to line 166 because the condition on line 155 was always true
156 for sql in split_sql(if_query):
157 try:
158 # replace template variables with null
159 sql = re.sub(r"\{\{[\w\d]+}}", "", sql)
161 parse_sql(sql)
162 except ParsingException as e:
163 raise ParsingException(f"Unable to parse: {sql}: {e}") from e
165 # plan next run
166 next_run_at = start_at
168 if schedule_str is not None: 168 ↛ 173line 168 didn't jump to line 173 because the condition on line 168 was always true
169 # try to calculate schedule string
170 calc_next_date(schedule_str, start_at)
171 else:
172 # no schedule for job end_at is meaningless
173 end_at = None
175 # create job record
176 record = db.Jobs(
177 company_id=ctx.company_id,
178 user_class=ctx.user_class,
179 name=name,
180 project_id=project.id,
181 query_str=query,
182 if_query_str=if_query,
183 start_at=start_at,
184 end_at=end_at,
185 next_run_at=next_run_at,
186 schedule_str=schedule_str,
187 )
188 db.session.add(record)
189 db.session.commit()
191 return name
193 def create(self, name: str, project_name: str, query: CreateJob) -> str:
194 """
195 Create job using AST query
196 :param name: name of the job
197 :param project_name: project name
198 :param query: AST query with job parameters
199 :return: name of created job
200 """
202 if project_name is None:
203 project_name = default_project
205 start_at = None
206 if query.start_str is not None:
207 start_at = parse_job_date(query.start_str)
208 if start_at < dt.datetime.now():
209 start_at = dt.datetime.now()
211 end_at = None
212 if query.end_str is not None:
213 end_at = parse_job_date(query.end_str)
215 query_str = query.query_str
216 if_query_str = query.if_query_str
218 schedule_str = None
219 if query.repeat_str is not None:
220 schedule_str = "every " + query.repeat_str
222 return self.add(
223 name,
224 project_name,
225 query=query_str,
226 start_at=start_at,
227 end_at=end_at,
228 if_query=if_query_str,
229 schedule_str=schedule_str,
230 )
232 def delete(self, name, project_name):
233 project_controller = ProjectController()
234 project = project_controller.get(name=project_name)
236 # check if exists
237 record = (
238 db.session.query(db.Jobs)
239 .filter_by(company_id=ctx.company_id, name=name, project_id=project.id, deleted_at=sa.null())
240 .first()
241 )
242 if record is None: 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true
243 raise EntityNotExistsError("Job does not exist", name)
245 self._delete_record(record)
246 db.session.commit()
248 # delete context
249 query_context_controller.drop_query_context("job", record.id)
250 query_context_controller.drop_query_context("job-if", record.id)
252 def _delete_record(self, record):
253 record.deleted_at = dt.datetime.now()
255 def get_list(self, project_name=None):
256 query = db.session.query(db.Jobs).filter_by(company_id=ctx.company_id, deleted_at=sa.null())
258 project_controller = ProjectController()
259 if project_name is not None: 259 ↛ 263line 259 didn't jump to line 263 because the condition on line 259 was always true
260 project = project_controller.get(name=project_name)
261 query = query.filter_by(project_id=project.id)
263 data = []
264 project_names = {i.id: i.name for i in project_controller.get_list()}
265 for record in query:
266 data.append(
267 {
268 "id": record.id,
269 "name": record.name,
270 "project": project_names[record.project_id],
271 "start_at": record.start_at,
272 "end_at": record.end_at,
273 "next_run_at": record.next_run_at,
274 "schedule_str": record.schedule_str,
275 "query": record.query_str,
276 "if_query": record.if_query_str,
277 "variables": query_context_controller.get_context_vars("job", record.id),
278 }
279 )
280 return data
282 def get(self, name: str, project_name: str) -> dict:
283 """
284 Get info about job
285 :param name: name of the job
286 :param project_name: job's project
287 :return: dict with info about job
288 """
290 project_controller = ProjectController()
291 project = project_controller.get(name=project_name)
293 record = (
294 db.session.query(db.Jobs)
295 .filter_by(company_id=ctx.company_id, name=name, project_id=project.id, deleted_at=sa.null())
296 .first()
297 )
299 if record is not None:
300 return {
301 "id": record.id,
302 "name": record.name,
303 "project": project_name,
304 "start_at": record.start_at,
305 "end_at": record.end_at,
306 "next_run_at": record.next_run_at,
307 "schedule_str": record.schedule_str,
308 "query": record.query_str,
309 "if_query": record.if_query_str,
310 "variables": query_context_controller.get_context_vars("job", record.id),
311 }
313 def get_history(self, name: str, project_name: str) -> List[dict]:
314 """
315 Get history of the job's calls
316 :param name: job name
317 :param project_name: project name
318 :return: List of job executions
319 """
321 logs_db_controller = LogDBController()
323 query = Select(
324 targets=[Star()],
325 from_table=Identifier("jobs_history"),
326 where=BinaryOperation(
327 op="and",
328 args=[
329 BinaryOperation(op="=", args=[Identifier("name"), Constant(name)]),
330 BinaryOperation(op="=", args=[Identifier("project"), Constant(project_name)]),
331 ],
332 ),
333 )
334 response = logs_db_controller.query(query)
336 names = [i["name"] for i in response.columns]
337 return response.data_frame[names].to_dict(orient="records")
340class JobsExecutor:
341 def get_next_tasks(self):
342 # filter next_run < now
343 query = (
344 db.session.query(db.Jobs)
345 .filter(
346 db.Jobs.next_run_at < dt.datetime.now(),
347 db.Jobs.deleted_at == sa.null(),
348 db.Jobs.active == True, # noqa
349 )
350 .order_by(db.Jobs.next_run_at)
351 )
353 return query.all()
355 def update_task_schedule(self, record):
356 # calculate next run
358 if record.next_run_at > dt.datetime.now():
359 # do nothing, it is already planned in future
360 return
362 if record.schedule_str is None:
363 # not need to run it anymore
364 self._delete_record(record)
365 return
367 next_run_at = calc_next_date(record.schedule_str, base_date=record.next_run_at)
369 if next_run_at is None:
370 # no need to run it
371 self._delete_record(record)
372 elif record.end_at is not None and next_run_at > record.end_at:
373 self._delete_record(record)
374 else:
375 # plan next run, but not in the past
376 if next_run_at < dt.datetime.now():
377 next_run_at = dt.datetime.now()
378 record.next_run_at = next_run_at
380 def _delete_record(self, record):
381 record.deleted_at = dt.datetime.now()
383 def lock_record(self, record_id):
384 # workaround for several concurrent workers on cloud:
385 # create history record before start of task
386 record = db.Jobs.query.get(record_id)
388 try:
389 history_record = db.JobsHistory(job_id=record.id, start_at=record.next_run_at, company_id=record.company_id)
391 db.session.add(history_record)
392 db.session.commit()
394 return history_record.id
396 except (KeyboardInterrupt, SystemExit):
397 raise
398 except Exception:
399 db.session.rollback()
401 # check if it is an old lock
402 history_record = db.JobsHistory.query.filter_by(
403 job_id=record.id, start_at=record.next_run_at, company_id=record.company_id
404 ).first()
405 if history_record.updated_at < dt.datetime.now() - dt.timedelta(seconds=30):
406 db.session.delete(history_record)
407 db.session.commit()
409 return None
411 def __fill_variables(self, sql, record, history_record):
412 if "{{PREVIOUS_START_DATETIME}}" in sql:
413 # get previous run date
414 history_prev = (
415 db.session.query(db.JobsHistory.start_at)
416 .filter(db.JobsHistory.job_id == record.id, db.JobsHistory.id != history_record.id)
417 .order_by(db.JobsHistory.id.desc())
418 .first()
419 )
420 if history_prev is None:
421 # start date of the job
422 value = record.created_at
423 else:
424 # fix for twitter: created_at filter must be minimum of 10 seconds prior to the current time
425 value = history_prev.start_at - dt.timedelta(seconds=60)
426 value = value.strftime("%Y-%m-%d %H:%M:%S")
427 sql = sql.replace("{{PREVIOUS_START_DATETIME}}", value)
429 if "{{START_DATE}}" in sql:
430 value = history_record.start_at.strftime("%Y-%m-%d")
431 sql = sql.replace("{{START_DATE}}", value)
432 if "{{START_DATETIME}}" in sql:
433 value = history_record.start_at.strftime("%Y-%m-%d %H:%M:%S")
434 sql = sql.replace("{{START_DATETIME}}", value)
435 return sql
437 def execute_task_local(self, record_id, history_id=None):
438 record = db.Jobs.query.get(record_id)
440 # set up environment
442 ctx.set_default()
443 ctx.company_id = record.company_id
444 if record.user_class is not None:
445 ctx.user_class = record.user_class
447 if history_id is None:
448 history_record = db.JobsHistory(
449 job_id=record.id,
450 start_at=dt.datetime.now(),
451 company_id=record.company_id,
452 )
453 db.session.add(history_record)
454 db.session.flush()
455 history_id = history_record.id
456 db.session.commit()
458 else:
459 history_record = db.JobsHistory.query.get(history_id)
461 project_controller = ProjectController()
462 project = project_controller.get(record.project_id)
463 executed_sql = ""
465 from mindsdb.api.executor.controllers.session_controller import SessionController
466 from mindsdb.api.executor.command_executor import ExecuteCommands
468 sql_session = SessionController()
469 sql_session.database = project.name
470 command_executor = ExecuteCommands(sql_session)
472 # job with condition?
473 query_context_controller.set_context("job-if", record.id)
474 error = ""
475 to_execute_query = True
476 if record.if_query_str is not None:
477 data = None
478 for sql in split_sql(record.if_query_str):
479 try:
480 # fill template variables
481 sql = self.__fill_variables(sql, record, history_record)
483 query = parse_sql(sql)
484 executed_sql += sql + "; "
486 ret = command_executor.execute_command(query)
487 if ret.error_code is not None:
488 error = ret.error_message
489 break
491 data = ret.data
492 except Exception as e:
493 logger.exception("Error to execute job`s condition query")
494 error = str(e)
495 break
497 # check error or last result
498 if error or data is None or len(data) == 0:
499 to_execute_query = False
501 query_context_controller.release_context("job-if", record.id)
502 if to_execute_query:
503 query_context_controller.set_context("job", record.id)
504 for sql in split_sql(record.query_str):
505 try:
506 # fill template variables
507 sql = self.__fill_variables(sql, record, history_record)
509 query = parse_sql(sql)
510 executed_sql += sql + "; "
512 ret = command_executor.execute_command(query)
513 if ret.error_code is not None:
514 error = ret.error_message
515 break
516 except Exception as e:
517 logger.exception("Error to execute job`s query")
518 error = str(e)
519 break
521 try:
522 self.update_task_schedule(record)
523 except Exception as e:
524 db.session.rollback()
525 logger.exception("Error to update schedule:")
526 error += f"Error to update schedule: {e}"
528 # stop scheduling
529 record.next_run_at = None
531 history_record = db.JobsHistory.query.get(history_id)
533 if error:
534 history_record.error = error
535 history_record.end_at = dt.datetime.now()
536 history_record.query_str = executed_sql
538 db.session.commit()