Coverage for mindsdb / interfaces / triggers / trigger_task.py: 0%
55 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import copy
2import traceback
3from mindsdb_sql_parser import parse_sql
4from mindsdb_sql_parser.ast import Data, Identifier
5from mindsdb.integrations.utilities.query_traversal import query_traversal
7from mindsdb.interfaces.storage import db
9from mindsdb.api.executor.controllers.session_controller import SessionController
10from mindsdb.api.executor.command_executor import ExecuteCommands
12from mindsdb.interfaces.database.projects import ProjectController
13from mindsdb.utilities import log
14from mindsdb.interfaces.tasks.task import BaseTask
15from mindsdb.utilities.context import context as ctx
17logger = log.getLogger(__name__)
20class TriggerTask(BaseTask):
21 def __init__(self, *args, **kwargs):
22 super().__init__(*args, **kwargs)
23 self.command_executor = None
24 self.query = None
26 # callback might be without context
27 self._ctx_dump = ctx.dump()
29 def run(self, stop_event):
30 trigger = db.Triggers.query.get(self.object_id)
32 # parse query
33 self.query = parse_sql(trigger.query_str)
35 session = SessionController()
37 # prepare executor
38 project_controller = ProjectController()
39 project = project_controller.get(trigger.project_id)
41 session.database = project.name
43 self.command_executor = ExecuteCommands(session)
45 # subscribe
46 database = session.integration_controller.get_by_id(trigger.database_id)
47 data_handler = session.integration_controller.get_data_handler(database["name"])
49 columns = trigger.columns
50 if columns is not None:
51 if columns == "":
52 columns = None
53 else:
54 columns = columns.split("|")
56 data_handler.subscribe(stop_event, self._callback, trigger.table_name, columns=columns)
58 def _callback(self, row, key=None):
59 logger.debug(f"trigger call: {row}, {key}")
61 # set up environment
62 ctx.load(self._ctx_dump)
64 try:
65 if key is not None:
66 row.update(key)
67 table = [row]
69 # inject data to query
70 query = copy.deepcopy(self.query)
72 def find_table(node, is_table, **kwargs):
73 if is_table:
74 if isinstance(node, Identifier) and len(node.parts) == 1 and node.parts[0] == "TABLE_DELTA":
75 # replace with data
76 return Data(table, alias=node.alias)
78 query_traversal(query, find_table)
80 # exec query
81 ret = self.command_executor.execute_command(query)
82 if ret.error_code is not None:
83 self.set_error(ret.error_message)
85 except Exception:
86 logger.exception("Error during trigger call processing")
87 self.set_error(str(traceback.format_exc()))
89 db.session.commit()