Coverage for mindsdb / interfaces / triggers / trigger_task.py: 0%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import copy 

2import traceback 

3from mindsdb_sql_parser import parse_sql 

4from mindsdb_sql_parser.ast import Data, Identifier 

5from mindsdb.integrations.utilities.query_traversal import query_traversal 

6 

7from mindsdb.interfaces.storage import db 

8 

9from mindsdb.api.executor.controllers.session_controller import SessionController 

10from mindsdb.api.executor.command_executor import ExecuteCommands 

11 

12from mindsdb.interfaces.database.projects import ProjectController 

13from mindsdb.utilities import log 

14from mindsdb.interfaces.tasks.task import BaseTask 

15from mindsdb.utilities.context import context as ctx 

16 

17logger = log.getLogger(__name__) 

18 

19 

20class TriggerTask(BaseTask): 

21 def __init__(self, *args, **kwargs): 

22 super().__init__(*args, **kwargs) 

23 self.command_executor = None 

24 self.query = None 

25 

26 # callback might be without context 

27 self._ctx_dump = ctx.dump() 

28 

29 def run(self, stop_event): 

30 trigger = db.Triggers.query.get(self.object_id) 

31 

32 # parse query 

33 self.query = parse_sql(trigger.query_str) 

34 

35 session = SessionController() 

36 

37 # prepare executor 

38 project_controller = ProjectController() 

39 project = project_controller.get(trigger.project_id) 

40 

41 session.database = project.name 

42 

43 self.command_executor = ExecuteCommands(session) 

44 

45 # subscribe 

46 database = session.integration_controller.get_by_id(trigger.database_id) 

47 data_handler = session.integration_controller.get_data_handler(database["name"]) 

48 

49 columns = trigger.columns 

50 if columns is not None: 

51 if columns == "": 

52 columns = None 

53 else: 

54 columns = columns.split("|") 

55 

56 data_handler.subscribe(stop_event, self._callback, trigger.table_name, columns=columns) 

57 

58 def _callback(self, row, key=None): 

59 logger.debug(f"trigger call: {row}, {key}") 

60 

61 # set up environment 

62 ctx.load(self._ctx_dump) 

63 

64 try: 

65 if key is not None: 

66 row.update(key) 

67 table = [row] 

68 

69 # inject data to query 

70 query = copy.deepcopy(self.query) 

71 

72 def find_table(node, is_table, **kwargs): 

73 if is_table: 

74 if isinstance(node, Identifier) and len(node.parts) == 1 and node.parts[0] == "TABLE_DELTA": 

75 # replace with data 

76 return Data(table, alias=node.alias) 

77 

78 query_traversal(query, find_table) 

79 

80 # exec query 

81 ret = self.command_executor.execute_command(query) 

82 if ret.error_code is not None: 

83 self.set_error(ret.error_message) 

84 

85 except Exception: 

86 logger.exception("Error during trigger call processing") 

87 self.set_error(str(traceback.format_exc())) 

88 

89 db.session.commit()