Coverage for mindsdb / interfaces / triggers / triggers_controller.py: 14%
74 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from mindsdb_sql_parser.ast import Identifier
3from mindsdb_sql_parser import parse_sql, ParsingException
5from mindsdb.interfaces.storage import db
6from mindsdb.interfaces.database.projects import ProjectController
7from mindsdb.utilities.context import context as ctx
8from mindsdb.utilities.config import config
10from mindsdb.api.executor.controllers.session_controller import SessionController
13class TriggersController:
14 OBJECT_TYPE = "trigger"
16 def add(self, name, project_name, table, query_str, columns=None):
17 if project_name is None:
18 project_name = config.get("default_project")
19 project_controller = ProjectController()
20 project = project_controller.get(name=project_name)
22 from mindsdb.api.executor.controllers.session_controller import SessionController
24 session = SessionController()
26 # check exists
27 trigger = self.get_trigger_record(name, project_name)
28 if trigger is not None:
29 raise Exception(f"Trigger already exists: {name}")
31 # check table
32 if len(table.parts) < 2:
33 raise Exception(f"Database or table not found: {table}")
35 table_name = Identifier(parts=table.parts[1:]).to_string()
36 db_name = table.parts[0]
38 db_integration = session.integration_controller.get(db_name)
39 db_handler = session.integration_controller.get_data_handler(db_name)
41 if not hasattr(db_handler, "subscribe"):
42 raise Exception(f"Handler {db_integration['engine']} doest support subscription")
44 df = db_handler.get_tables().data_frame
45 column = "table_name"
46 if column not in df.columns:
47 column = df.columns[0]
48 tables = list(df[column])
50 # check only if tables are visible
51 if len(tables) > 0 and table_name not in tables:
52 raise Exception(f"Table {table_name} not found in {db_name}")
54 columns_str = None
55 if columns is not None and len(columns) > 0:
56 # join to string with delimiter
57 columns_str = "|".join([col.parts[-1] for col in columns])
59 # check sql
60 try:
61 parse_sql(query_str)
62 except ParsingException as e:
63 raise ParsingException(f"Unable to parse: {query_str}: {e}") from e
65 # create job record
66 record = db.Triggers(
67 name=name,
68 project_id=project.id,
69 database_id=db_integration["id"],
70 table_name=table_name,
71 query_str=query_str,
72 columns=columns_str,
73 )
74 db.session.add(record)
75 db.session.flush()
77 task_record = db.Tasks(
78 company_id=ctx.company_id,
79 user_class=ctx.user_class,
80 object_type=self.OBJECT_TYPE,
81 object_id=record.id,
82 )
83 db.session.add(task_record)
84 db.session.commit()
86 def delete(self, name, project_name):
87 # check exists
89 trigger = self.get_trigger_record(name, project_name)
90 if trigger is None:
91 raise Exception(f"Trigger doesn't exist: {name}")
93 task = db.Tasks.query.filter(
94 db.Tasks.object_type == self.OBJECT_TYPE,
95 db.Tasks.object_id == trigger.id,
96 db.Tasks.company_id == ctx.company_id,
97 ).first()
99 if task is not None:
100 db.session.delete(task)
102 db.session.delete(trigger)
104 db.session.commit()
106 def get_trigger_record(self, name, project_name):
107 project_controller = ProjectController()
108 project = project_controller.get(name=project_name)
110 query = (
111 db.session.query(db.Triggers)
112 .join(db.Tasks, db.Triggers.id == db.Tasks.object_id)
113 .filter(
114 db.Triggers.project_id == project.id,
115 db.Triggers.name == name,
116 db.Tasks.object_type == self.OBJECT_TYPE,
117 db.Tasks.company_id == ctx.company_id,
118 )
119 )
120 return query.first()
122 def get_list(self, project_name=None):
123 session = SessionController()
125 query = (
126 db.session.query(
127 db.Tasks.object_id,
128 db.Triggers.project_id,
129 db.Triggers.name,
130 db.Triggers.database_id,
131 db.Triggers.table_name,
132 db.Triggers.query_str,
133 db.Tasks.last_error,
134 )
135 .join(db.Triggers, db.Triggers.id == db.Tasks.object_id)
136 .filter(
137 db.Tasks.object_type == self.OBJECT_TYPE,
138 db.Tasks.company_id == ctx.company_id,
139 )
140 )
142 project_controller = ProjectController()
143 if project_name is not None:
144 project = project_controller.get(name=project_name)
145 query = query.filter(db.Triggers.project_id == project.id)
147 database_names = {i["id"]: i["name"] for i in session.database_controller.get_list()}
149 project_names = {i.id: i.name for i in project_controller.get_list()}
150 data = []
151 for record in query:
152 data.append(
153 {
154 "id": record.object_id,
155 "project": project_names[record.project_id],
156 "name": record.name,
157 "database": database_names.get(record.database_id, "?"),
158 "table": record.table_name,
159 "query": record.query_str,
160 "last_error": record.last_error,
161 }
162 )
163 return data