Coverage for mindsdb / api / executor / datahub / datanodes / mindsdb_tables.py: 44%

190 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import json 

2 

3import pandas as pd 

4from mindsdb_sql_parser.ast import BinaryOperation, Constant, Select 

5from mindsdb_sql_parser.ast.base import ASTNode 

6 

7from mindsdb.interfaces.agents.agents_controller import AgentsController 

8from mindsdb.interfaces.jobs.jobs_controller import JobsController 

9from mindsdb.interfaces.skills.skills_controller import SkillsController 

10from mindsdb.interfaces.database.views import ViewController 

11from mindsdb.interfaces.database.projects import ProjectController 

12from mindsdb.interfaces.query_context.context_controller import query_context_controller 

13 

14from mindsdb.api.executor.datahub.datanodes.system_tables import Table 

15 

16 

17def to_json(obj): 

18 if obj is None: 

19 return None 

20 try: 

21 return json.dumps(obj) 

22 except TypeError: 

23 return obj 

24 

25 

26def get_project_name(query: ASTNode = None): 

27 project_name = None 

28 if ( 

29 isinstance(query, Select) 

30 and type(query.where) is BinaryOperation 

31 and query.where.op == "=" 

32 and query.where.args[0].parts == ["project"] 

33 and isinstance(query.where.args[1], Constant) 

34 ): 

35 project_name = query.where.args[1].value 

36 return project_name 

37 

38 

39class MdbTable(Table): 

40 visible: bool = True 

41 

42 

43class ModelsTable(MdbTable): 

44 name = "MODELS" 

45 columns = [ 

46 "NAME", 

47 "ENGINE", 

48 "PROJECT", 

49 "ACTIVE", 

50 "VERSION", 

51 "STATUS", 

52 "ACCURACY", 

53 "PREDICT", 

54 "UPDATE_STATUS", 

55 "MINDSDB_VERSION", 

56 "ERROR", 

57 "SELECT_DATA_QUERY", 

58 "TRAINING_OPTIONS", 

59 "CURRENT_TRAINING_PHASE", 

60 "TOTAL_TRAINING_PHASES", 

61 "TRAINING_PHASE_NAME", 

62 "TAG", 

63 "CREATED_AT", 

64 "TRAINING_TIME", 

65 ] 

66 

67 @classmethod 

68 def get_data(cls, session, inf_schema, **kwargs): 

69 data = [] 

70 for project_name in inf_schema.get_projects_names(): 

71 project = inf_schema.database_controller.get_project(name=project_name) 

72 project_models = project.get_models(active=None, with_secrets=session.show_secrets) 

73 for row in project_models: 73 ↛ 74line 73 didn't jump to line 74 because the loop on line 73 never started

74 table_name = row["name"] 

75 table_meta = row["metadata"] 

76 

77 data.append( 

78 [ 

79 table_name, 

80 table_meta["engine"], 

81 project_name, 

82 table_meta["active"], 

83 table_meta["version"], 

84 table_meta["status"], 

85 table_meta["accuracy"], 

86 table_meta["predict"], 

87 table_meta["update_status"], 

88 table_meta["mindsdb_version"], 

89 table_meta["error"], 

90 table_meta["select_data_query"], 

91 to_json(table_meta["training_options"]), 

92 table_meta["current_training_phase"], 

93 table_meta["total_training_phases"], 

94 table_meta["training_phase_name"], 

95 table_meta["label"], 

96 row["created_at"], 

97 table_meta["training_time"], 

98 ] 

99 ) 

100 # TODO optimise here 

101 # if target_table is not None and target_table != project_name: 

102 # continue 

103 

104 df = pd.DataFrame(data, columns=cls.columns) 

105 return df 

106 

107 

108class DatabasesTable(MdbTable): 

109 name = "DATABASES" 

110 columns = ["NAME", "TYPE", "ENGINE", "CONNECTION_DATA"] 

111 

112 @classmethod 

113 def get_data(cls, session, inf_schema, **kwargs): 

114 project = inf_schema.database_controller.get_list(with_secrets=session.show_secrets) 

115 data = [[x["name"], x["type"], x["engine"], to_json(x.get("connection_data"))] for x in project] 

116 

117 df = pd.DataFrame(data, columns=cls.columns) 

118 return df 

119 

120 

121class MLEnginesTable(MdbTable): 

122 name = "ML_ENGINES" 

123 columns = ["NAME", "HANDLER", "CONNECTION_DATA"] 

124 

125 @classmethod 

126 def get_data(cls, session, inf_schema, **kwargs): 

127 integrations = inf_schema.integration_controller.get_all(show_secrets=session.show_secrets) 

128 ml_integrations = {key: val for key, val in integrations.items() if val["type"] == "ml"} 

129 

130 data = [] 

131 for _key, val in ml_integrations.items(): 

132 data.append([val["name"], val.get("engine"), to_json(val.get("connection_data"))]) 

133 

134 df = pd.DataFrame(data, columns=cls.columns) 

135 return df 

136 

137 

138class HandlersTable(MdbTable): 

139 name = "HANDLERS" 

140 columns = [ 

141 "NAME", 

142 "TYPE", 

143 "TITLE", 

144 "DESCRIPTION", 

145 "VERSION", 

146 "CONNECTION_ARGS", 

147 "IMPORT_SUCCESS", 

148 "IMPORT_ERROR", 

149 ] 

150 

151 @classmethod 

152 def get_data(cls, inf_schema, **kwargs): 

153 handlers = inf_schema.integration_controller.get_handlers_import_status() 

154 

155 data = [] 

156 for _key, val in handlers.items(): 

157 connection_args = val.get("connection_args") 

158 if connection_args is not None: 

159 connection_args = to_json(connection_args) 

160 import_success = val.get("import", {}).get("success") 

161 import_error = val.get("import", {}).get("error_message") 

162 data.append( 

163 [ 

164 val["name"], 

165 val.get("type"), 

166 val.get("title"), 

167 val.get("description"), 

168 val.get("version"), 

169 connection_args, 

170 import_success, 

171 import_error, 

172 ] 

173 ) 

174 

175 df = pd.DataFrame(data, columns=cls.columns) 

176 return df 

177 

178 

179class JobsTable(MdbTable): 

180 name = "JOBS" 

181 columns = [ 

182 "NAME", 

183 "PROJECT", 

184 "START_AT", 

185 "END_AT", 

186 "NEXT_RUN_AT", 

187 "SCHEDULE_STR", 

188 "QUERY", 

189 "IF_QUERY", 

190 "VARIABLES", 

191 ] 

192 

193 @classmethod 

194 def get_data(cls, query: ASTNode = None, **kwargs): 

195 jobs_controller = JobsController() 

196 

197 project_name = None 

198 if ( 

199 isinstance(query, Select) 

200 and type(query.where) is BinaryOperation 

201 and query.where.op == "=" 

202 and query.where.args[0].parts == ["project"] 

203 and isinstance(query.where.args[1], Constant) 

204 ): 

205 project_name = query.where.args[1].value 

206 

207 data = jobs_controller.get_list(project_name) 

208 

209 columns = cls.columns 

210 columns_lower = [col.lower() for col in columns] 

211 

212 # to list of lists 

213 data = [[row[k] for k in columns_lower] for row in data] 

214 

215 return pd.DataFrame(data, columns=columns) 

216 

217 

218class TriggersTable(MdbTable): 

219 name = "TRIGGERS" 

220 columns = [ 

221 "TRIGGER_CATALOG", 

222 "TRIGGER_SCHEMA", 

223 "TRIGGER_NAME", 

224 "EVENT_MANIPULATION", 

225 "EVENT_OBJECT_CATALOG", 

226 "EVENT_OBJECT_SCHEMA", 

227 "EVENT_OBJECT_TABLE", 

228 "ACTION_ORDER", 

229 "ACTION_CONDITION", 

230 "ACTION_STATEMENT", 

231 "ACTION_ORIENTATION", 

232 "ACTION_TIMING", 

233 "ACTION_REFERENCE_OLD_TABLE", 

234 "ACTION_REFERENCE_NEW_TABLE", 

235 "ACTION_REFERENCE_OLD_ROW", 

236 "ACTION_REFERENCE_NEW_ROW", 

237 "CREATED", 

238 "SQL_MODE", 

239 "DEFINER", 

240 "CHARACTER_SET_CLIENT", 

241 "COLLATION_CONNECTION", 

242 "DATABASE_COLLATION", 

243 ] 

244 

245 mindsdb_columns = ["NAME", "PROJECT", "DATABASE", "TABLE", "QUERY", "LAST_ERROR"] 

246 

247 @classmethod 

248 def get_data(cls, query: ASTNode = None, inf_schema=None, **kwargs): 

249 from mindsdb.interfaces.triggers.triggers_controller import TriggersController 

250 

251 triggers_controller = TriggersController() 

252 

253 project_name = None 

254 if ( 

255 isinstance(query, Select) 

256 and type(query.where) is BinaryOperation 

257 and query.where.op == "=" 

258 and query.where.args[0].parts == ["project"] 

259 and isinstance(query.where.args[1], Constant) 

260 ): 

261 project_name = query.where.args[1].value 

262 

263 data = triggers_controller.get_list(project_name) 

264 

265 columns = cls.mindsdb_columns 

266 if inf_schema.session.api_type == "sql": 

267 columns = columns + cls.columns 

268 columns_lower = [col.lower() for col in columns] 

269 

270 # to list of lists 

271 data = [[row.get(k) for k in columns_lower] for row in data] 

272 

273 return pd.DataFrame(data, columns=columns) 

274 

275 

276class ChatbotsTable(MdbTable): 

277 name = "CHATBOTS" 

278 columns = [ 

279 "NAME", 

280 "PROJECT", 

281 "DATABASE", 

282 "MODEL_NAME", 

283 "PARAMS", 

284 "IS_RUNNING", 

285 "LAST_ERROR", 

286 "WEBHOOK_TOKEN", 

287 ] 

288 

289 @classmethod 

290 def get_data(cls, query: ASTNode = None, **kwargs): 

291 from mindsdb.interfaces.chatbot.chatbot_controller import ChatBotController 

292 

293 chatbot_controller = ChatBotController() 

294 

295 project_name = None 

296 if ( 

297 isinstance(query, Select) 

298 and type(query.where) is BinaryOperation 

299 and query.where.op == "=" 

300 and query.where.args[0].parts == ["project"] 

301 and isinstance(query.where.args[1], Constant) 

302 ): 

303 project_name = query.where.args[1].value 

304 

305 chatbot_data = chatbot_controller.get_chatbots(project_name=project_name) 

306 

307 columns = cls.columns 

308 columns_lower = [col.lower() for col in columns] 

309 

310 # to list of lists 

311 data = [] 

312 for row in chatbot_data: 

313 row["params"] = to_json(row["params"]) 

314 data.append([row[k] for k in columns_lower]) 

315 

316 return pd.DataFrame(data, columns=columns) 

317 

318 

319class KBTable(MdbTable): 

320 name = "KNOWLEDGE_BASES" 

321 columns = [ 

322 "NAME", 

323 "PROJECT", 

324 "EMBEDDING_MODEL", 

325 "RERANKING_MODEL", 

326 "STORAGE", 

327 "METADATA_COLUMNS", 

328 "CONTENT_COLUMNS", 

329 "ID_COLUMN", 

330 "PARAMS", 

331 "INSERT_STARTED_AT", 

332 "INSERT_FINISHED_AT", 

333 "PROCESSED_ROWS", 

334 "ERROR", 

335 "QUERY_ID", 

336 ] 

337 

338 @classmethod 

339 def get_data(cls, query: ASTNode = None, inf_schema=None, **kwargs): 

340 project_name = get_project_name(query) 

341 

342 from mindsdb.interfaces.knowledge_base.controller import KnowledgeBaseController 

343 

344 controller = KnowledgeBaseController(inf_schema.session) 

345 kb_list = controller.list(project_name) 

346 

347 # shouldn't be a lot of queries, we can fetch them all 

348 queries_data = {item["id"]: item for item in query_context_controller.list_queries()} 

349 

350 data = [] 

351 

352 for kb in kb_list: 

353 query_item = {} 

354 query_id = kb["query_id"] 

355 if query_id is not None: 

356 if query_id in queries_data: 

357 query_item = queries_data.get(query_id) 

358 else: 

359 query_id = None 

360 

361 data.append( 

362 ( 

363 kb["name"], 

364 kb["project_name"], 

365 to_json(kb["embedding_model"]), 

366 to_json(kb["reranking_model"]), 

367 kb["vector_database"] + "." + kb["vector_database_table"], 

368 to_json(kb["metadata_columns"]), 

369 to_json(kb["content_columns"]), 

370 kb["id_column"], 

371 to_json(kb["params"]), 

372 query_item.get("started_at"), 

373 query_item.get("finished_at"), 

374 query_item.get("processed_rows"), 

375 query_item.get("error"), 

376 query_id, 

377 ) 

378 ) 

379 

380 return pd.DataFrame(data, columns=cls.columns) 

381 

382 

383class SkillsTable(MdbTable): 

384 name = "SKILLS" 

385 columns = ["NAME", "PROJECT", "TYPE", "PARAMS"] 

386 

387 @classmethod 

388 def get_data(cls, query: ASTNode = None, **kwargs): 

389 skills_controller = SkillsController() 

390 

391 project_name = get_project_name(query) 

392 

393 all_skills = skills_controller.get_skills(project_name) 

394 

395 project_controller = ProjectController() 

396 project_names = {p.id: p.name for p in project_controller.get_list()} 

397 

398 # NAME, PROJECT, TYPE, PARAMS 

399 data = [(s.name, project_names[s.project_id], s.type, s.params) for s in all_skills] 

400 return pd.DataFrame(data, columns=cls.columns) 

401 

402 

403class AgentsTable(MdbTable): 

404 name = "AGENTS" 

405 columns = ["NAME", "PROJECT", "MODEL_NAME", "SKILLS", "PARAMS"] 

406 

407 @classmethod 

408 def get_data(cls, query: ASTNode = None, inf_schema=None, **kwargs): 

409 agents_controller = AgentsController() 

410 

411 project_name = get_project_name(query) 

412 all_agents = agents_controller.get_agents(project_name) 

413 

414 project_controller = ProjectController() 

415 project_names = {i.id: i.name for i in project_controller.get_list()} 

416 

417 # NAME, PROJECT, MODEL, SKILLS, PARAMS 

418 data = [ 

419 ( 

420 a.name, 

421 project_names[a.project_id], 

422 a.model_name, 

423 [rel.skill.name for rel in a.skills_relationships], 

424 to_json(a.params), 

425 ) 

426 for a in all_agents 

427 ] 

428 return pd.DataFrame(data, columns=cls.columns) 

429 

430 

431class ViewsTable(MdbTable): 

432 name = "VIEWS" 

433 columns = ["NAME", "PROJECT", "QUERY"] 

434 

435 @classmethod 

436 def get_data(cls, query: ASTNode = None, **kwargs): 

437 project_name = get_project_name(query) 

438 

439 data = ViewController().list(project_name) 

440 

441 columns_lower = [col.lower() for col in cls.columns] 

442 

443 # to list of lists 

444 data = [[row[k] for k in columns_lower] for row in data] 

445 

446 return pd.DataFrame(data, columns=cls.columns) 

447 

448 

449class QueriesTable(MdbTable): 

450 name = "QUERIES" 

451 columns = [ 

452 "ID", 

453 "STARTED_AT", 

454 "FINISHED_AT", 

455 "PROCESSED_ROWS", 

456 "ERROR", 

457 "SQL", 

458 "DATABASE", 

459 "PARAMETERS", 

460 "CONTEXT", 

461 "UPDATED_AT", 

462 ] 

463 

464 @classmethod 

465 def get_data(cls, **kwargs): 

466 """ 

467 Returns all queries in progres or recently completed 

468 Only queries marked as is_resumable by planner are stored in this table 

469 :param kwargs: 

470 :return: 

471 """ 

472 

473 data = query_context_controller.list_queries() 

474 columns_lower = [col.lower() for col in cls.columns] 

475 

476 data = [[row[k] for k in columns_lower] for row in data] 

477 

478 return pd.DataFrame(data, columns=cls.columns)