Coverage for mindsdb / interfaces / triggers / triggers_controller.py: 14%

74 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb_sql_parser.ast import Identifier 

2 

3from mindsdb_sql_parser import parse_sql, ParsingException 

4 

5from mindsdb.interfaces.storage import db 

6from mindsdb.interfaces.database.projects import ProjectController 

7from mindsdb.utilities.context import context as ctx 

8from mindsdb.utilities.config import config 

9 

10from mindsdb.api.executor.controllers.session_controller import SessionController 

11 

12 

13class TriggersController: 

14 OBJECT_TYPE = "trigger" 

15 

16 def add(self, name, project_name, table, query_str, columns=None): 

17 if project_name is None: 

18 project_name = config.get("default_project") 

19 project_controller = ProjectController() 

20 project = project_controller.get(name=project_name) 

21 

22 from mindsdb.api.executor.controllers.session_controller import SessionController 

23 

24 session = SessionController() 

25 

26 # check exists 

27 trigger = self.get_trigger_record(name, project_name) 

28 if trigger is not None: 

29 raise Exception(f"Trigger already exists: {name}") 

30 

31 # check table 

32 if len(table.parts) < 2: 

33 raise Exception(f"Database or table not found: {table}") 

34 

35 table_name = Identifier(parts=table.parts[1:]).to_string() 

36 db_name = table.parts[0] 

37 

38 db_integration = session.integration_controller.get(db_name) 

39 db_handler = session.integration_controller.get_data_handler(db_name) 

40 

41 if not hasattr(db_handler, "subscribe"): 

42 raise Exception(f"Handler {db_integration['engine']} doest support subscription") 

43 

44 df = db_handler.get_tables().data_frame 

45 column = "table_name" 

46 if column not in df.columns: 

47 column = df.columns[0] 

48 tables = list(df[column]) 

49 

50 # check only if tables are visible 

51 if len(tables) > 0 and table_name not in tables: 

52 raise Exception(f"Table {table_name} not found in {db_name}") 

53 

54 columns_str = None 

55 if columns is not None and len(columns) > 0: 

56 # join to string with delimiter 

57 columns_str = "|".join([col.parts[-1] for col in columns]) 

58 

59 # check sql 

60 try: 

61 parse_sql(query_str) 

62 except ParsingException as e: 

63 raise ParsingException(f"Unable to parse: {query_str}: {e}") from e 

64 

65 # create job record 

66 record = db.Triggers( 

67 name=name, 

68 project_id=project.id, 

69 database_id=db_integration["id"], 

70 table_name=table_name, 

71 query_str=query_str, 

72 columns=columns_str, 

73 ) 

74 db.session.add(record) 

75 db.session.flush() 

76 

77 task_record = db.Tasks( 

78 company_id=ctx.company_id, 

79 user_class=ctx.user_class, 

80 object_type=self.OBJECT_TYPE, 

81 object_id=record.id, 

82 ) 

83 db.session.add(task_record) 

84 db.session.commit() 

85 

86 def delete(self, name, project_name): 

87 # check exists 

88 

89 trigger = self.get_trigger_record(name, project_name) 

90 if trigger is None: 

91 raise Exception(f"Trigger doesn't exist: {name}") 

92 

93 task = db.Tasks.query.filter( 

94 db.Tasks.object_type == self.OBJECT_TYPE, 

95 db.Tasks.object_id == trigger.id, 

96 db.Tasks.company_id == ctx.company_id, 

97 ).first() 

98 

99 if task is not None: 

100 db.session.delete(task) 

101 

102 db.session.delete(trigger) 

103 

104 db.session.commit() 

105 

106 def get_trigger_record(self, name, project_name): 

107 project_controller = ProjectController() 

108 project = project_controller.get(name=project_name) 

109 

110 query = ( 

111 db.session.query(db.Triggers) 

112 .join(db.Tasks, db.Triggers.id == db.Tasks.object_id) 

113 .filter( 

114 db.Triggers.project_id == project.id, 

115 db.Triggers.name == name, 

116 db.Tasks.object_type == self.OBJECT_TYPE, 

117 db.Tasks.company_id == ctx.company_id, 

118 ) 

119 ) 

120 return query.first() 

121 

122 def get_list(self, project_name=None): 

123 session = SessionController() 

124 

125 query = ( 

126 db.session.query( 

127 db.Tasks.object_id, 

128 db.Triggers.project_id, 

129 db.Triggers.name, 

130 db.Triggers.database_id, 

131 db.Triggers.table_name, 

132 db.Triggers.query_str, 

133 db.Tasks.last_error, 

134 ) 

135 .join(db.Triggers, db.Triggers.id == db.Tasks.object_id) 

136 .filter( 

137 db.Tasks.object_type == self.OBJECT_TYPE, 

138 db.Tasks.company_id == ctx.company_id, 

139 ) 

140 ) 

141 

142 project_controller = ProjectController() 

143 if project_name is not None: 

144 project = project_controller.get(name=project_name) 

145 query = query.filter(db.Triggers.project_id == project.id) 

146 

147 database_names = {i["id"]: i["name"] for i in session.database_controller.get_list()} 

148 

149 project_names = {i.id: i.name for i in project_controller.get_list()} 

150 data = [] 

151 for record in query: 

152 data.append( 

153 { 

154 "id": record.object_id, 

155 "project": project_names[record.project_id], 

156 "name": record.name, 

157 "database": database_names.get(record.database_id, "?"), 

158 "table": record.table_name, 

159 "query": record.query_str, 

160 "last_error": record.last_error, 

161 } 

162 ) 

163 return data