Coverage for mindsdb / interfaces / jobs / jobs_controller.py: 38%

268 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import re 

2import datetime as dt 

3from dateutil.relativedelta import relativedelta 

4from typing import List 

5 

6import sqlalchemy as sa 

7 

8from mindsdb_sql_parser import parse_sql, ParsingException 

9from mindsdb_sql_parser.ast.mindsdb import CreateJob 

10from mindsdb_sql_parser.ast import Select, Star, Identifier, BinaryOperation, Constant 

11 

12from mindsdb.utilities.config import config 

13from mindsdb.utilities.context import context as ctx 

14from mindsdb.utilities.exception import EntityNotExistsError, EntityExistsError 

15from mindsdb.interfaces.storage import db 

16from mindsdb.interfaces.database.projects import ProjectController 

17from mindsdb.interfaces.query_context.context_controller import query_context_controller 

18from mindsdb.interfaces.database.log import LogDBController 

19 

20from mindsdb.utilities import log 

21 

22logger = log.getLogger(__name__) 

23 

24default_project = config.get("default_project") 

25 

26 

27def split_sql(sql): 

28 # split sql by ';' ignoring delimiter in quotes 

29 pattern = re.compile(r"""((?:[^;"']|"[^"]*"|'[^']*')+)""") 

30 return pattern.split(sql)[1::2] 

31 

32 

33def calc_next_date(schedule_str, base_date: dt.datetime): 

34 schedule_str = schedule_str.lower().strip() 

35 

36 repeat_prefix = "every " 

37 if schedule_str.startswith(repeat_prefix): 37 ↛ 41line 37 didn't jump to line 41 because the condition on line 37 was always true

38 repeat_str = schedule_str[len(repeat_prefix) :] 

39 else: 

40 # TODO cron format 

41 raise NotImplementedError(f"Schedule: {schedule_str}") 

42 

43 items = repeat_str.split() 

44 

45 if len(items) == 1: 45 ↛ 48line 45 didn't jump to line 48 because the condition on line 45 was always true

46 value = "1" 

47 period = items[0] 

48 elif len(items) == 2: 

49 value, period = items 

50 else: 

51 raise Exception(f"Can't parse repeat string: {repeat_str}") 

52 

53 if not value.isdigit(): 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true

54 raise Exception(f"Number expected: {value}") 

55 value = int(value) 

56 if period in ("minute", "minutes", "min"): 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 delta = dt.timedelta(minutes=value) 

58 elif period in ("hour", "hours"): 58 ↛ 60line 58 didn't jump to line 60 because the condition on line 58 was always true

59 delta = dt.timedelta(hours=value) 

60 elif period in ("day", "days"): 

61 delta = dt.timedelta(days=value) 

62 elif period in ("week", "weeks"): 

63 delta = dt.timedelta(days=value * 7) # 1 week = 7 days 

64 elif period in ("month", "months"): 

65 delta = relativedelta(months=value) 

66 else: 

67 raise Exception(f"Unknown period: {period}") 

68 

69 # period limitation disabled for now 

70 # config = Config() 

71 # is_cloud = config.get('cloud', False) 

72 # if is_cloud and ctx.user_class == 0: 

73 # if delta < dt.timedelta(days=1): 

74 # raise Exception("Minimal allowed period can't be less than one day") 

75 

76 next_date = base_date + delta 

77 

78 return next_date 

79 

80 

81def parse_job_date(date_str: str) -> dt.datetime: 

82 """ 

83 Convert string used as job data to datetime object 

84 :param date_str: 

85 :return: 

86 """ 

87 

88 if date_str.upper() == "NOW": 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 return dt.datetime.now() 

90 

91 date_formats = ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d"] 

92 date = None 

93 for date_format in date_formats: 

94 try: 

95 date = dt.datetime.strptime(date_str, date_format) 

96 except ValueError: 

97 pass 

98 if date is None: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 raise ValueError(f"Can't parse date: {date_str}") 

100 return date 

101 

102 

103class JobsController: 

104 def add( 

105 self, 

106 name: str, 

107 project_name: str, 

108 query: str, 

109 start_at: dt.datetime = None, 

110 end_at: dt.datetime = None, 

111 if_query: str = None, 

112 schedule_str: str = None, 

113 ) -> str: 

114 """ 

115 Create a new job 

116 

117 More info: https://docs.mindsdb.com/mindsdb_sql/sql/create/jobs#create-job 

118 

119 :param name: name of the job 

120 :param project_name: project name 

121 :param query: sql query for job to execute, it could be several queries seperated by ';' 

122 :param start_at: datetime of first execution of the job, optional 

123 :param end_at: datetime after which job should not be executed anymore 

124 :param if_query: condition for job, 

125 if this query (or last from list of queries separated by ';') returns data and no error in queries: 

126 job will not be executed 

127 :param schedule_str: description how to repeat job 

128 at the moment supports: 'every <number> <dimension>' or 'every <dimension>' 

129 :return: name of created job 

130 """ 

131 

132 project_controller = ProjectController() 

133 project = project_controller.get(name=project_name) 

134 

135 # check if exists 

136 if self.get(name, project_name) is not None: 136 ↛ 137line 136 didn't jump to line 137 because the condition on line 136 was never true

137 raise EntityExistsError("Job already exists", name) 

138 

139 if start_at is None: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true

140 start_at = dt.datetime.now() 

141 

142 if end_at is not None and end_at < start_at: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true

143 raise Exception(f"Wrong end date {start_at} > {end_at}") 

144 

145 # check sql = try to parse it 

146 for sql in split_sql(query): 

147 try: 

148 # replace template variables with null 

149 sql = re.sub(r"\{\{[\w\d]+}}", "", sql) 

150 

151 parse_sql(sql) 

152 except ParsingException as e: 

153 raise ParsingException(f"Unable to parse: {sql}: {e}") from e 

154 

155 if if_query is not None: 155 ↛ 166line 155 didn't jump to line 166 because the condition on line 155 was always true

156 for sql in split_sql(if_query): 

157 try: 

158 # replace template variables with null 

159 sql = re.sub(r"\{\{[\w\d]+}}", "", sql) 

160 

161 parse_sql(sql) 

162 except ParsingException as e: 

163 raise ParsingException(f"Unable to parse: {sql}: {e}") from e 

164 

165 # plan next run 

166 next_run_at = start_at 

167 

168 if schedule_str is not None: 168 ↛ 173line 168 didn't jump to line 173 because the condition on line 168 was always true

169 # try to calculate schedule string 

170 calc_next_date(schedule_str, start_at) 

171 else: 

172 # no schedule for job end_at is meaningless 

173 end_at = None 

174 

175 # create job record 

176 record = db.Jobs( 

177 company_id=ctx.company_id, 

178 user_class=ctx.user_class, 

179 name=name, 

180 project_id=project.id, 

181 query_str=query, 

182 if_query_str=if_query, 

183 start_at=start_at, 

184 end_at=end_at, 

185 next_run_at=next_run_at, 

186 schedule_str=schedule_str, 

187 ) 

188 db.session.add(record) 

189 db.session.commit() 

190 

191 return name 

192 

193 def create(self, name: str, project_name: str, query: CreateJob) -> str: 

194 """ 

195 Create job using AST query 

196 :param name: name of the job 

197 :param project_name: project name 

198 :param query: AST query with job parameters 

199 :return: name of created job 

200 """ 

201 

202 if project_name is None: 

203 project_name = default_project 

204 

205 start_at = None 

206 if query.start_str is not None: 

207 start_at = parse_job_date(query.start_str) 

208 if start_at < dt.datetime.now(): 

209 start_at = dt.datetime.now() 

210 

211 end_at = None 

212 if query.end_str is not None: 

213 end_at = parse_job_date(query.end_str) 

214 

215 query_str = query.query_str 

216 if_query_str = query.if_query_str 

217 

218 schedule_str = None 

219 if query.repeat_str is not None: 

220 schedule_str = "every " + query.repeat_str 

221 

222 return self.add( 

223 name, 

224 project_name, 

225 query=query_str, 

226 start_at=start_at, 

227 end_at=end_at, 

228 if_query=if_query_str, 

229 schedule_str=schedule_str, 

230 ) 

231 

232 def delete(self, name, project_name): 

233 project_controller = ProjectController() 

234 project = project_controller.get(name=project_name) 

235 

236 # check if exists 

237 record = ( 

238 db.session.query(db.Jobs) 

239 .filter_by(company_id=ctx.company_id, name=name, project_id=project.id, deleted_at=sa.null()) 

240 .first() 

241 ) 

242 if record is None: 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true

243 raise EntityNotExistsError("Job does not exist", name) 

244 

245 self._delete_record(record) 

246 db.session.commit() 

247 

248 # delete context 

249 query_context_controller.drop_query_context("job", record.id) 

250 query_context_controller.drop_query_context("job-if", record.id) 

251 

252 def _delete_record(self, record): 

253 record.deleted_at = dt.datetime.now() 

254 

255 def get_list(self, project_name=None): 

256 query = db.session.query(db.Jobs).filter_by(company_id=ctx.company_id, deleted_at=sa.null()) 

257 

258 project_controller = ProjectController() 

259 if project_name is not None: 259 ↛ 263line 259 didn't jump to line 263 because the condition on line 259 was always true

260 project = project_controller.get(name=project_name) 

261 query = query.filter_by(project_id=project.id) 

262 

263 data = [] 

264 project_names = {i.id: i.name for i in project_controller.get_list()} 

265 for record in query: 

266 data.append( 

267 { 

268 "id": record.id, 

269 "name": record.name, 

270 "project": project_names[record.project_id], 

271 "start_at": record.start_at, 

272 "end_at": record.end_at, 

273 "next_run_at": record.next_run_at, 

274 "schedule_str": record.schedule_str, 

275 "query": record.query_str, 

276 "if_query": record.if_query_str, 

277 "variables": query_context_controller.get_context_vars("job", record.id), 

278 } 

279 ) 

280 return data 

281 

282 def get(self, name: str, project_name: str) -> dict: 

283 """ 

284 Get info about job 

285 :param name: name of the job 

286 :param project_name: job's project 

287 :return: dict with info about job 

288 """ 

289 

290 project_controller = ProjectController() 

291 project = project_controller.get(name=project_name) 

292 

293 record = ( 

294 db.session.query(db.Jobs) 

295 .filter_by(company_id=ctx.company_id, name=name, project_id=project.id, deleted_at=sa.null()) 

296 .first() 

297 ) 

298 

299 if record is not None: 

300 return { 

301 "id": record.id, 

302 "name": record.name, 

303 "project": project_name, 

304 "start_at": record.start_at, 

305 "end_at": record.end_at, 

306 "next_run_at": record.next_run_at, 

307 "schedule_str": record.schedule_str, 

308 "query": record.query_str, 

309 "if_query": record.if_query_str, 

310 "variables": query_context_controller.get_context_vars("job", record.id), 

311 } 

312 

313 def get_history(self, name: str, project_name: str) -> List[dict]: 

314 """ 

315 Get history of the job's calls 

316 :param name: job name 

317 :param project_name: project name 

318 :return: List of job executions 

319 """ 

320 

321 logs_db_controller = LogDBController() 

322 

323 query = Select( 

324 targets=[Star()], 

325 from_table=Identifier("jobs_history"), 

326 where=BinaryOperation( 

327 op="and", 

328 args=[ 

329 BinaryOperation(op="=", args=[Identifier("name"), Constant(name)]), 

330 BinaryOperation(op="=", args=[Identifier("project"), Constant(project_name)]), 

331 ], 

332 ), 

333 ) 

334 response = logs_db_controller.query(query) 

335 

336 names = [i["name"] for i in response.columns] 

337 return response.data_frame[names].to_dict(orient="records") 

338 

339 

340class JobsExecutor: 

341 def get_next_tasks(self): 

342 # filter next_run < now 

343 query = ( 

344 db.session.query(db.Jobs) 

345 .filter( 

346 db.Jobs.next_run_at < dt.datetime.now(), 

347 db.Jobs.deleted_at == sa.null(), 

348 db.Jobs.active == True, # noqa 

349 ) 

350 .order_by(db.Jobs.next_run_at) 

351 ) 

352 

353 return query.all() 

354 

355 def update_task_schedule(self, record): 

356 # calculate next run 

357 

358 if record.next_run_at > dt.datetime.now(): 

359 # do nothing, it is already planned in future 

360 return 

361 

362 if record.schedule_str is None: 

363 # not need to run it anymore 

364 self._delete_record(record) 

365 return 

366 

367 next_run_at = calc_next_date(record.schedule_str, base_date=record.next_run_at) 

368 

369 if next_run_at is None: 

370 # no need to run it 

371 self._delete_record(record) 

372 elif record.end_at is not None and next_run_at > record.end_at: 

373 self._delete_record(record) 

374 else: 

375 # plan next run, but not in the past 

376 if next_run_at < dt.datetime.now(): 

377 next_run_at = dt.datetime.now() 

378 record.next_run_at = next_run_at 

379 

380 def _delete_record(self, record): 

381 record.deleted_at = dt.datetime.now() 

382 

383 def lock_record(self, record_id): 

384 # workaround for several concurrent workers on cloud: 

385 # create history record before start of task 

386 record = db.Jobs.query.get(record_id) 

387 

388 try: 

389 history_record = db.JobsHistory(job_id=record.id, start_at=record.next_run_at, company_id=record.company_id) 

390 

391 db.session.add(history_record) 

392 db.session.commit() 

393 

394 return history_record.id 

395 

396 except (KeyboardInterrupt, SystemExit): 

397 raise 

398 except Exception: 

399 db.session.rollback() 

400 

401 # check if it is an old lock 

402 history_record = db.JobsHistory.query.filter_by( 

403 job_id=record.id, start_at=record.next_run_at, company_id=record.company_id 

404 ).first() 

405 if history_record.updated_at < dt.datetime.now() - dt.timedelta(seconds=30): 

406 db.session.delete(history_record) 

407 db.session.commit() 

408 

409 return None 

410 

411 def __fill_variables(self, sql, record, history_record): 

412 if "{{PREVIOUS_START_DATETIME}}" in sql: 

413 # get previous run date 

414 history_prev = ( 

415 db.session.query(db.JobsHistory.start_at) 

416 .filter(db.JobsHistory.job_id == record.id, db.JobsHistory.id != history_record.id) 

417 .order_by(db.JobsHistory.id.desc()) 

418 .first() 

419 ) 

420 if history_prev is None: 

421 # start date of the job 

422 value = record.created_at 

423 else: 

424 # fix for twitter: created_at filter must be minimum of 10 seconds prior to the current time 

425 value = history_prev.start_at - dt.timedelta(seconds=60) 

426 value = value.strftime("%Y-%m-%d %H:%M:%S") 

427 sql = sql.replace("{{PREVIOUS_START_DATETIME}}", value) 

428 

429 if "{{START_DATE}}" in sql: 

430 value = history_record.start_at.strftime("%Y-%m-%d") 

431 sql = sql.replace("{{START_DATE}}", value) 

432 if "{{START_DATETIME}}" in sql: 

433 value = history_record.start_at.strftime("%Y-%m-%d %H:%M:%S") 

434 sql = sql.replace("{{START_DATETIME}}", value) 

435 return sql 

436 

437 def execute_task_local(self, record_id, history_id=None): 

438 record = db.Jobs.query.get(record_id) 

439 

440 # set up environment 

441 

442 ctx.set_default() 

443 ctx.company_id = record.company_id 

444 if record.user_class is not None: 

445 ctx.user_class = record.user_class 

446 

447 if history_id is None: 

448 history_record = db.JobsHistory( 

449 job_id=record.id, 

450 start_at=dt.datetime.now(), 

451 company_id=record.company_id, 

452 ) 

453 db.session.add(history_record) 

454 db.session.flush() 

455 history_id = history_record.id 

456 db.session.commit() 

457 

458 else: 

459 history_record = db.JobsHistory.query.get(history_id) 

460 

461 project_controller = ProjectController() 

462 project = project_controller.get(record.project_id) 

463 executed_sql = "" 

464 

465 from mindsdb.api.executor.controllers.session_controller import SessionController 

466 from mindsdb.api.executor.command_executor import ExecuteCommands 

467 

468 sql_session = SessionController() 

469 sql_session.database = project.name 

470 command_executor = ExecuteCommands(sql_session) 

471 

472 # job with condition? 

473 query_context_controller.set_context("job-if", record.id) 

474 error = "" 

475 to_execute_query = True 

476 if record.if_query_str is not None: 

477 data = None 

478 for sql in split_sql(record.if_query_str): 

479 try: 

480 # fill template variables 

481 sql = self.__fill_variables(sql, record, history_record) 

482 

483 query = parse_sql(sql) 

484 executed_sql += sql + "; " 

485 

486 ret = command_executor.execute_command(query) 

487 if ret.error_code is not None: 

488 error = ret.error_message 

489 break 

490 

491 data = ret.data 

492 except Exception as e: 

493 logger.exception("Error to execute job`s condition query") 

494 error = str(e) 

495 break 

496 

497 # check error or last result 

498 if error or data is None or len(data) == 0: 

499 to_execute_query = False 

500 

501 query_context_controller.release_context("job-if", record.id) 

502 if to_execute_query: 

503 query_context_controller.set_context("job", record.id) 

504 for sql in split_sql(record.query_str): 

505 try: 

506 # fill template variables 

507 sql = self.__fill_variables(sql, record, history_record) 

508 

509 query = parse_sql(sql) 

510 executed_sql += sql + "; " 

511 

512 ret = command_executor.execute_command(query) 

513 if ret.error_code is not None: 

514 error = ret.error_message 

515 break 

516 except Exception as e: 

517 logger.exception("Error to execute job`s query") 

518 error = str(e) 

519 break 

520 

521 try: 

522 self.update_task_schedule(record) 

523 except Exception as e: 

524 db.session.rollback() 

525 logger.exception("Error to update schedule:") 

526 error += f"Error to update schedule: {e}" 

527 

528 # stop scheduling 

529 record.next_run_at = None 

530 

531 history_record = db.JobsHistory.query.get(history_id) 

532 

533 if error: 

534 history_record.error = error 

535 history_record.end_at = dt.datetime.now() 

536 history_record.query_str = executed_sql 

537 

538 db.session.commit()