Coverage for mindsdb / api / http / namespaces / databases.py: 69%

274 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import shutil 

2import tempfile 

3from http import HTTPStatus 

4from typing import Dict 

5from pathlib import Path 

6 

7from flask import request 

8from flask_restx import Resource 

9 

10from mindsdb.api.http.utils import http_error 

11from mindsdb.api.http.namespaces.configs.databases import ns_conf 

12from mindsdb.api.mysql.mysql_proxy.classes.fake_mysql_proxy import FakeMysqlProxy 

13from mindsdb.api.executor.controllers.session_controller import SessionController 

14from mindsdb.api.executor.datahub.classes.tables_row import TablesRow 

15from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE 

16from mindsdb.metrics.metrics import api_endpoint_metrics 

17from mindsdb_sql_parser import parse_sql, ParsingException 

18from mindsdb_sql_parser.ast import CreateTable, DropTables 

19from mindsdb.utilities.exception import EntityNotExistsError 

20from mindsdb.integrations.libs.response import HandlerStatusResponse 

21from mindsdb.utilities import log 

22 

23 

24logger = log.getLogger(__name__) 

25 

26 

27@ns_conf.route("/") 

28class DatabasesResource(Resource): 

29 @ns_conf.doc("list_databases") 

30 @api_endpoint_metrics("GET", "/databases") 

31 def get(self): 

32 """List all databases""" 

33 session = SessionController() 

34 return session.database_controller.get_list() 

35 

36 @ns_conf.doc("create_database") 

37 @api_endpoint_metrics("POST", "/databases") 

38 def post(self): 

39 """Create a database""" 

40 if "database" not in request.json: 

41 return http_error( 

42 HTTPStatus.BAD_REQUEST, "Wrong argument", 'Must provide "database" parameter in POST body' 

43 ) 

44 check_connection = request.json.get("check_connection", False) 

45 session = SessionController() 

46 database = request.json["database"] 

47 parameters = {} 

48 if "name" not in database: 

49 return http_error(HTTPStatus.BAD_REQUEST, "Wrong argument", 'Missing "name" field for database') 

50 if "engine" not in database: 

51 return http_error( 

52 HTTPStatus.BAD_REQUEST, 

53 "Wrong argument", 

54 'Missing "engine" field for database. If you want to create a project instead, use the /api/projects endpoint.', 

55 ) 

56 if "parameters" in database: 56 ↛ 58line 56 didn't jump to line 58 because the condition on line 56 was always true

57 parameters = database["parameters"] 

58 name = database["name"] 

59 

60 if session.database_controller.exists(name): 

61 return http_error(HTTPStatus.CONFLICT, "Name conflict", f"Database with name {name} already exists.") 

62 

63 storage = None 

64 if check_connection: 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true

65 try: 

66 handler = session.integration_controller.create_tmp_handler(name, database["engine"], parameters) 

67 status = handler.check_connection() 

68 except ImportError as import_error: 

69 status = HandlerStatusResponse(success=False, error_message=str(import_error)) 

70 

71 if status.success is not True: 

72 if hasattr(status, "redirect_url") and isinstance(status, str): 

73 return { 

74 "status": "redirect_required", 

75 "redirect_url": status.redirect_url, 

76 "detail": status.error_message, 

77 }, HTTPStatus.OK 

78 return {"status": "connection_error", "detail": status.error_message}, HTTPStatus.OK 

79 

80 if status.copy_storage: 

81 storage = handler.handler_storage.export_files() 

82 

83 new_integration_id = session.integration_controller.add(name, database["engine"], parameters) 

84 

85 if storage: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 handler = session.integration_controller.get_data_handler(name, connect=False) 

87 handler.handler_storage.import_files(storage) 

88 

89 new_integration = session.database_controller.get_integration(new_integration_id) 

90 return new_integration, HTTPStatus.CREATED 

91 

92 

93@ns_conf.route("/status") 

94class DatabasesStatusResource(Resource): 

95 @ns_conf.doc("check_database_connection_status") 

96 @api_endpoint_metrics("POST", "/databases/status") 

97 def post(self): 

98 """Check the connection parameters for a database""" 

99 data = {} 

100 if request.content_type == "application/json": 

101 data.update(request.json or {}) 

102 elif request.content_type.startswith("multipart/form-data"): 

103 data.update(request.form or {}) 

104 

105 if "engine" not in data: 

106 return http_error(HTTPStatus.BAD_REQUEST, "Wrong argument", 'Missing "engine" field for database') 

107 

108 engine = data["engine"] 

109 parameters = data 

110 del parameters["engine"] 

111 

112 files = request.files 

113 temp_dir = None 

114 if files is not None and len(files) > 0: 

115 temp_dir = tempfile.mkdtemp(prefix="integration_files_") 

116 for key, file in files.items(): 

117 temp_dir_path = Path(temp_dir) 

118 file_name = Path(file.filename) 

119 file_path = temp_dir_path.joinpath(file_name).resolve() 

120 if temp_dir_path not in file_path.parents: 

121 raise Exception(f"Can not save file at path: {file_path}") 

122 file.save(file_path) 

123 parameters[key] = str(file_path) 

124 

125 session = SessionController() 

126 

127 try: 

128 handler = session.integration_controller.create_tmp_handler("test_connection", engine, parameters) 

129 status = handler.check_connection() 

130 except ImportError as import_error: 

131 status = HandlerStatusResponse(success=False, error_message=str(import_error)) 

132 except Exception as unknown_error: 

133 status = HandlerStatusResponse(success=False, error_message=str(unknown_error)) 

134 finally: 

135 if temp_dir is not None: 

136 shutil.rmtree(temp_dir) 

137 

138 if not status.success: 

139 if hasattr(status, "redirect_url") and isinstance(status, str): 

140 return { 

141 "status": "redirect_required", 

142 "redirect_url": status.redirect_url, 

143 "detail": status.error_message, 

144 }, HTTPStatus.OK 

145 return {"status": "connection_error", "detail": status.error_message}, HTTPStatus.OK 

146 

147 return { 

148 "status": "success", 

149 }, HTTPStatus.OK 

150 

151 

152@ns_conf.route("/<database_name>") 

153class DatabaseResource(Resource): 

154 @ns_conf.doc("get_database") 

155 @api_endpoint_metrics("GET", "/databases/database") 

156 def get(self, database_name): 

157 """Gets a database by name""" 

158 session = SessionController() 

159 check_connection = request.args.get("check_connection", "false").lower() in ("1", "true") 

160 try: 

161 project = session.database_controller.get_project(database_name) 

162 result = {"name": database_name, "type": "project", "id": project.id, "engine": None} 

163 if check_connection: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true

164 result["connection_status"] = {"success": True, "error_message": None} 

165 except (ValueError, EntityNotExistsError): 

166 integration = session.integration_controller.get(database_name) 

167 if integration is None: 

168 return http_error( 

169 HTTPStatus.NOT_FOUND, "Database not found", f"Database with name {database_name} does not exist." 

170 ) 

171 result = integration 

172 if check_connection: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 integration["connection_status"] = {"success": False, "error_message": None} 

174 try: 

175 handler = session.integration_controller.get_data_handler(database_name) 

176 status = handler.check_connection() 

177 integration["connection_status"]["success"] = status.success 

178 integration["connection_status"]["error_message"] = status.error_message 

179 except Exception as e: 

180 integration["connection_status"]["success"] = False 

181 integration["connection_status"]["error_message"] = str(e) 

182 

183 return result 

184 

185 @ns_conf.doc("update_database") 

186 @api_endpoint_metrics("PUT", "/databases/database") 

187 def put(self, database_name): 

188 """Updates or creates a database""" 

189 if "database" not in request.json: 

190 return http_error( 

191 HTTPStatus.BAD_REQUEST, "Wrong argument", 'Must provide "database" parameter in POST body' 

192 ) 

193 

194 session = SessionController() 

195 parameters = {} 

196 database = request.json["database"] 

197 check_connection = request.json.get("check_connection", False) 

198 

199 if "parameters" in database: 199 ↛ 201line 199 didn't jump to line 201 because the condition on line 199 was always true

200 parameters = database["parameters"] 

201 if not session.database_controller.exists(database_name): 

202 # Create. 

203 if "engine" not in database: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 return http_error( 

205 HTTPStatus.BAD_REQUEST, 

206 "Wrong argument", 

207 'Missing "engine" field for new database. ' 

208 "If you want to create a project instead, use the POST /api/projects endpoint.", 

209 ) 

210 new_integration_id = session.integration_controller.add(database_name, database["engine"], parameters) 

211 new_integration = session.database_controller.get_integration(new_integration_id) 

212 return new_integration, HTTPStatus.CREATED 

213 

214 try: 

215 session.integration_controller.modify(database_name, parameters, check_connection=check_connection) 

216 except Exception as e: 

217 status = HandlerStatusResponse(success=False, error_message=str(e)) 

218 return http_error(HTTPStatus.BAD_REQUEST, "Connection error", status.error_message or "Connection error") 

219 

220 return session.integration_controller.get(database_name) 

221 

222 @ns_conf.doc("delete_database") 

223 @api_endpoint_metrics("DELETE", "/databases/database") 

224 def delete(self, database_name): 

225 """Deletes a database by name""" 

226 session = SessionController() 

227 if not session.database_controller.exists(database_name): 

228 return http_error( 

229 HTTPStatus.NOT_FOUND, "Database not found", f"Database with name {database_name} does not exist." 

230 ) 

231 try: 

232 session.database_controller.delete(database_name) 

233 except Exception as e: 

234 logger.debug(f"Error while deleting database '{database_name}'", exc_info=True) 

235 return http_error( 

236 HTTPStatus.BAD_REQUEST, 

237 "Error", 

238 f"Cannot delete database {database_name}. " 

239 + "This is most likely a system database, a permanent integration, or an ML engine with active models. " 

240 + f"Full error: {e}. " 

241 + "Please check the name and try again.", 

242 ) 

243 return "", HTTPStatus.NO_CONTENT 

244 

245 

246def _tables_row_to_obj(table_row: TablesRow) -> Dict: 

247 type = table_row.TABLE_TYPE.lower() 

248 if table_row.TABLE_TYPE == "BASE TABLE": 248 ↛ 250line 248 didn't jump to line 250 because the condition on line 248 was always true

249 type = "data" 

250 return {"name": table_row.TABLE_NAME, "type": type} 

251 

252 

253@ns_conf.route("/<database_name>/tables") 

254class TablesList(Resource): 

255 @ns_conf.doc("list_tables") 

256 @api_endpoint_metrics("GET", "/databases/database/tables") 

257 def get(self, database_name): 

258 """Get all tables in a database""" 

259 session = SessionController() 

260 datanode = session.datahub.get(database_name) 

261 all_tables = datanode.get_tables() 

262 table_objs = [_tables_row_to_obj(t) for t in all_tables] 

263 return table_objs 

264 

265 @ns_conf.doc("create_table") 

266 @api_endpoint_metrics("POST", "/databases/database/tables") 

267 def post(self, database_name): 

268 """Creates a table in a database""" 

269 if "table" not in request.json: 

270 return http_error(HTTPStatus.BAD_REQUEST, "Wrong argument", 'Must provide "table" parameter in POST body') 

271 table = request.json["table"] 

272 if "name" not in table: 

273 return http_error(HTTPStatus.BAD_REQUEST, "Wrong argument", 'Missing "name" field for table') 

274 if "select" not in table: 

275 return http_error(HTTPStatus.BAD_REQUEST, "Wrong argument", 'Missing "select" field for table') 

276 table_name = table["name"] 

277 select_query = table["select"] 

278 replace = False 

279 if "replace" in table: 279 ↛ 282line 279 didn't jump to line 282 because the condition on line 279 was always true

280 replace = table["replace"] 

281 

282 session = SessionController() 

283 try: 

284 session.database_controller.get_project(database_name) 

285 error_message = ( 

286 f"Database {database_name} is a project. " 

287 + f"If you want to create a model or view, use the projects/{database_name}/models/{table_name} or " 

288 + f"projects/{database_name}/views/{table_name} endpoints instead." 

289 ) 

290 return http_error(HTTPStatus.BAD_REQUEST, "Error", error_message) 

291 except EntityNotExistsError: 

292 # Only support creating tables from integrations. 

293 pass 

294 

295 datanode = session.datahub.get(database_name) 

296 if datanode is None: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 return http_error( 

298 HTTPStatus.NOT_FOUND, "Database not exists", f"Database with name {database_name} does not exist" 

299 ) 

300 all_tables = datanode.get_tables() 

301 for t in all_tables: 

302 if t.TABLE_NAME == table_name and not replace: 

303 return http_error(HTTPStatus.CONFLICT, "Name conflict", f"Table with name {table_name} already exists") 

304 

305 try: 

306 select_ast = parse_sql(select_query) 

307 except ParsingException: 

308 return http_error(HTTPStatus.BAD_REQUEST, "Error", f"Could not parse select statement {select_query}") 

309 

310 create_ast = CreateTable(f"{database_name}.{table_name}", from_select=select_ast, is_replace=replace) 

311 

312 mysql_proxy = FakeMysqlProxy() 

313 

314 try: 

315 mysql_proxy.process_query(create_ast.get_string()) 

316 except Exception as e: 

317 return http_error(HTTPStatus.BAD_REQUEST, "Error", str(e)) 

318 

319 all_tables = datanode.get_tables() 

320 try: 

321 matching_table = next(t for t in all_tables if t.TABLE_NAME == table_name) 

322 return _tables_row_to_obj(matching_table), HTTPStatus.CREATED 

323 except StopIteration: 

324 return http_error( 

325 HTTPStatus.INTERNAL_SERVER_ERROR, "Error", f"Table with name {table_name} could not be created" 

326 ) 

327 

328 

329@ns_conf.route("/<database_name>/tables/<table_name>") 

330@ns_conf.param("database_name", "Name of the database") 

331@ns_conf.param("table_name", "Name of the table") 

332class TableResource(Resource): 

333 @ns_conf.doc("get_table") 

334 @api_endpoint_metrics("GET", "/databases/database/tables/table") 

335 def get(self, database_name, table_name): 

336 session = SessionController() 

337 datanode = session.datahub.get(database_name) 

338 all_tables = datanode.get_tables() 

339 try: 

340 matching_table = next(t for t in all_tables if t.TABLE_NAME == table_name) 

341 return _tables_row_to_obj(matching_table) 

342 except StopIteration: 

343 return http_error(HTTPStatus.NOT_FOUND, "Table not found", f"Table with name {table_name} not found") 

344 

345 @ns_conf.doc("drop_table") 

346 @api_endpoint_metrics("DELETE", "/databases/database/tables/table") 

347 def delete(self, database_name, table_name): 

348 session = SessionController() 

349 try: 

350 session.database_controller.get_project(database_name) 

351 error_message = ( 

352 f"Database {database_name} is a project. " 

353 + f"If you want to delete a model or view, use the projects/{database_name}/models/{table_name} or " 

354 + f"projects/{database_name}/views/{table_name} endpoints instead." 

355 ) 

356 return http_error(HTTPStatus.BAD_REQUEST, "Error", error_message) 

357 except EntityNotExistsError: 

358 # Only support dropping tables from integrations. 

359 pass 

360 

361 datanode = session.datahub.get(database_name) 

362 if datanode is None: 

363 return http_error( 

364 HTTPStatus.NOT_FOUND, "Database not found", f"Database with name {database_name} not found" 

365 ) 

366 all_tables = datanode.get_tables() 

367 try: 

368 next(t for t in all_tables if t.TABLE_NAME == table_name) 

369 except StopIteration: 

370 return http_error(HTTPStatus.NOT_FOUND, "Table not found", f"Table with name {table_name} not found") 

371 

372 drop_ast = DropTables(tables=[table_name], if_exists=True) 

373 

374 try: 

375 integration_handler = session.integration_controller.get_data_handler(database_name) 

376 except Exception: 

377 return http_error( 

378 HTTPStatus.INTERNAL_SERVER_ERROR, "Error", f"Could not get database handler for {database_name}" 

379 ) 

380 try: 

381 result = integration_handler.query(drop_ast) 

382 except NotImplementedError: 

383 return http_error( 

384 HTTPStatus.BAD_REQUEST, "Error", f"Database {database_name} does not support dropping tables." 

385 ) 

386 if result.type == RESPONSE_TYPE.ERROR: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true

387 return http_error(HTTPStatus.BAD_REQUEST, "Error", result.error_message) 

388 return "", HTTPStatus.NO_CONTENT