Coverage for mindsdb / api / http / namespaces / databases.py: 69%
274 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import shutil
2import tempfile
3from http import HTTPStatus
4from typing import Dict
5from pathlib import Path
7from flask import request
8from flask_restx import Resource
10from mindsdb.api.http.utils import http_error
11from mindsdb.api.http.namespaces.configs.databases import ns_conf
12from mindsdb.api.mysql.mysql_proxy.classes.fake_mysql_proxy import FakeMysqlProxy
13from mindsdb.api.executor.controllers.session_controller import SessionController
14from mindsdb.api.executor.datahub.classes.tables_row import TablesRow
15from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE
16from mindsdb.metrics.metrics import api_endpoint_metrics
17from mindsdb_sql_parser import parse_sql, ParsingException
18from mindsdb_sql_parser.ast import CreateTable, DropTables
19from mindsdb.utilities.exception import EntityNotExistsError
20from mindsdb.integrations.libs.response import HandlerStatusResponse
21from mindsdb.utilities import log
24logger = log.getLogger(__name__)
27@ns_conf.route("/")
28class DatabasesResource(Resource):
29 @ns_conf.doc("list_databases")
30 @api_endpoint_metrics("GET", "/databases")
31 def get(self):
32 """List all databases"""
33 session = SessionController()
34 return session.database_controller.get_list()
36 @ns_conf.doc("create_database")
37 @api_endpoint_metrics("POST", "/databases")
38 def post(self):
39 """Create a database"""
40 if "database" not in request.json:
41 return http_error(
42 HTTPStatus.BAD_REQUEST, "Wrong argument", 'Must provide "database" parameter in POST body'
43 )
44 check_connection = request.json.get("check_connection", False)
45 session = SessionController()
46 database = request.json["database"]
47 parameters = {}
48 if "name" not in database:
49 return http_error(HTTPStatus.BAD_REQUEST, "Wrong argument", 'Missing "name" field for database')
50 if "engine" not in database:
51 return http_error(
52 HTTPStatus.BAD_REQUEST,
53 "Wrong argument",
54 'Missing "engine" field for database. If you want to create a project instead, use the /api/projects endpoint.',
55 )
56 if "parameters" in database: 56 ↛ 58line 56 didn't jump to line 58 because the condition on line 56 was always true
57 parameters = database["parameters"]
58 name = database["name"]
60 if session.database_controller.exists(name):
61 return http_error(HTTPStatus.CONFLICT, "Name conflict", f"Database with name {name} already exists.")
63 storage = None
64 if check_connection: 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true
65 try:
66 handler = session.integration_controller.create_tmp_handler(name, database["engine"], parameters)
67 status = handler.check_connection()
68 except ImportError as import_error:
69 status = HandlerStatusResponse(success=False, error_message=str(import_error))
71 if status.success is not True:
72 if hasattr(status, "redirect_url") and isinstance(status, str):
73 return {
74 "status": "redirect_required",
75 "redirect_url": status.redirect_url,
76 "detail": status.error_message,
77 }, HTTPStatus.OK
78 return {"status": "connection_error", "detail": status.error_message}, HTTPStatus.OK
80 if status.copy_storage:
81 storage = handler.handler_storage.export_files()
83 new_integration_id = session.integration_controller.add(name, database["engine"], parameters)
85 if storage: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 handler = session.integration_controller.get_data_handler(name, connect=False)
87 handler.handler_storage.import_files(storage)
89 new_integration = session.database_controller.get_integration(new_integration_id)
90 return new_integration, HTTPStatus.CREATED
93@ns_conf.route("/status")
94class DatabasesStatusResource(Resource):
95 @ns_conf.doc("check_database_connection_status")
96 @api_endpoint_metrics("POST", "/databases/status")
97 def post(self):
98 """Check the connection parameters for a database"""
99 data = {}
100 if request.content_type == "application/json":
101 data.update(request.json or {})
102 elif request.content_type.startswith("multipart/form-data"):
103 data.update(request.form or {})
105 if "engine" not in data:
106 return http_error(HTTPStatus.BAD_REQUEST, "Wrong argument", 'Missing "engine" field for database')
108 engine = data["engine"]
109 parameters = data
110 del parameters["engine"]
112 files = request.files
113 temp_dir = None
114 if files is not None and len(files) > 0:
115 temp_dir = tempfile.mkdtemp(prefix="integration_files_")
116 for key, file in files.items():
117 temp_dir_path = Path(temp_dir)
118 file_name = Path(file.filename)
119 file_path = temp_dir_path.joinpath(file_name).resolve()
120 if temp_dir_path not in file_path.parents:
121 raise Exception(f"Can not save file at path: {file_path}")
122 file.save(file_path)
123 parameters[key] = str(file_path)
125 session = SessionController()
127 try:
128 handler = session.integration_controller.create_tmp_handler("test_connection", engine, parameters)
129 status = handler.check_connection()
130 except ImportError as import_error:
131 status = HandlerStatusResponse(success=False, error_message=str(import_error))
132 except Exception as unknown_error:
133 status = HandlerStatusResponse(success=False, error_message=str(unknown_error))
134 finally:
135 if temp_dir is not None:
136 shutil.rmtree(temp_dir)
138 if not status.success:
139 if hasattr(status, "redirect_url") and isinstance(status, str):
140 return {
141 "status": "redirect_required",
142 "redirect_url": status.redirect_url,
143 "detail": status.error_message,
144 }, HTTPStatus.OK
145 return {"status": "connection_error", "detail": status.error_message}, HTTPStatus.OK
147 return {
148 "status": "success",
149 }, HTTPStatus.OK
152@ns_conf.route("/<database_name>")
153class DatabaseResource(Resource):
154 @ns_conf.doc("get_database")
155 @api_endpoint_metrics("GET", "/databases/database")
156 def get(self, database_name):
157 """Gets a database by name"""
158 session = SessionController()
159 check_connection = request.args.get("check_connection", "false").lower() in ("1", "true")
160 try:
161 project = session.database_controller.get_project(database_name)
162 result = {"name": database_name, "type": "project", "id": project.id, "engine": None}
163 if check_connection: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true
164 result["connection_status"] = {"success": True, "error_message": None}
165 except (ValueError, EntityNotExistsError):
166 integration = session.integration_controller.get(database_name)
167 if integration is None:
168 return http_error(
169 HTTPStatus.NOT_FOUND, "Database not found", f"Database with name {database_name} does not exist."
170 )
171 result = integration
172 if check_connection: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 integration["connection_status"] = {"success": False, "error_message": None}
174 try:
175 handler = session.integration_controller.get_data_handler(database_name)
176 status = handler.check_connection()
177 integration["connection_status"]["success"] = status.success
178 integration["connection_status"]["error_message"] = status.error_message
179 except Exception as e:
180 integration["connection_status"]["success"] = False
181 integration["connection_status"]["error_message"] = str(e)
183 return result
185 @ns_conf.doc("update_database")
186 @api_endpoint_metrics("PUT", "/databases/database")
187 def put(self, database_name):
188 """Updates or creates a database"""
189 if "database" not in request.json:
190 return http_error(
191 HTTPStatus.BAD_REQUEST, "Wrong argument", 'Must provide "database" parameter in POST body'
192 )
194 session = SessionController()
195 parameters = {}
196 database = request.json["database"]
197 check_connection = request.json.get("check_connection", False)
199 if "parameters" in database: 199 ↛ 201line 199 didn't jump to line 201 because the condition on line 199 was always true
200 parameters = database["parameters"]
201 if not session.database_controller.exists(database_name):
202 # Create.
203 if "engine" not in database: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true
204 return http_error(
205 HTTPStatus.BAD_REQUEST,
206 "Wrong argument",
207 'Missing "engine" field for new database. '
208 "If you want to create a project instead, use the POST /api/projects endpoint.",
209 )
210 new_integration_id = session.integration_controller.add(database_name, database["engine"], parameters)
211 new_integration = session.database_controller.get_integration(new_integration_id)
212 return new_integration, HTTPStatus.CREATED
214 try:
215 session.integration_controller.modify(database_name, parameters, check_connection=check_connection)
216 except Exception as e:
217 status = HandlerStatusResponse(success=False, error_message=str(e))
218 return http_error(HTTPStatus.BAD_REQUEST, "Connection error", status.error_message or "Connection error")
220 return session.integration_controller.get(database_name)
222 @ns_conf.doc("delete_database")
223 @api_endpoint_metrics("DELETE", "/databases/database")
224 def delete(self, database_name):
225 """Deletes a database by name"""
226 session = SessionController()
227 if not session.database_controller.exists(database_name):
228 return http_error(
229 HTTPStatus.NOT_FOUND, "Database not found", f"Database with name {database_name} does not exist."
230 )
231 try:
232 session.database_controller.delete(database_name)
233 except Exception as e:
234 logger.debug(f"Error while deleting database '{database_name}'", exc_info=True)
235 return http_error(
236 HTTPStatus.BAD_REQUEST,
237 "Error",
238 f"Cannot delete database {database_name}. "
239 + "This is most likely a system database, a permanent integration, or an ML engine with active models. "
240 + f"Full error: {e}. "
241 + "Please check the name and try again.",
242 )
243 return "", HTTPStatus.NO_CONTENT
246def _tables_row_to_obj(table_row: TablesRow) -> Dict:
247 type = table_row.TABLE_TYPE.lower()
248 if table_row.TABLE_TYPE == "BASE TABLE": 248 ↛ 250line 248 didn't jump to line 250 because the condition on line 248 was always true
249 type = "data"
250 return {"name": table_row.TABLE_NAME, "type": type}
253@ns_conf.route("/<database_name>/tables")
254class TablesList(Resource):
255 @ns_conf.doc("list_tables")
256 @api_endpoint_metrics("GET", "/databases/database/tables")
257 def get(self, database_name):
258 """Get all tables in a database"""
259 session = SessionController()
260 datanode = session.datahub.get(database_name)
261 all_tables = datanode.get_tables()
262 table_objs = [_tables_row_to_obj(t) for t in all_tables]
263 return table_objs
265 @ns_conf.doc("create_table")
266 @api_endpoint_metrics("POST", "/databases/database/tables")
267 def post(self, database_name):
268 """Creates a table in a database"""
269 if "table" not in request.json:
270 return http_error(HTTPStatus.BAD_REQUEST, "Wrong argument", 'Must provide "table" parameter in POST body')
271 table = request.json["table"]
272 if "name" not in table:
273 return http_error(HTTPStatus.BAD_REQUEST, "Wrong argument", 'Missing "name" field for table')
274 if "select" not in table:
275 return http_error(HTTPStatus.BAD_REQUEST, "Wrong argument", 'Missing "select" field for table')
276 table_name = table["name"]
277 select_query = table["select"]
278 replace = False
279 if "replace" in table: 279 ↛ 282line 279 didn't jump to line 282 because the condition on line 279 was always true
280 replace = table["replace"]
282 session = SessionController()
283 try:
284 session.database_controller.get_project(database_name)
285 error_message = (
286 f"Database {database_name} is a project. "
287 + f"If you want to create a model or view, use the projects/{database_name}/models/{table_name} or "
288 + f"projects/{database_name}/views/{table_name} endpoints instead."
289 )
290 return http_error(HTTPStatus.BAD_REQUEST, "Error", error_message)
291 except EntityNotExistsError:
292 # Only support creating tables from integrations.
293 pass
295 datanode = session.datahub.get(database_name)
296 if datanode is None: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true
297 return http_error(
298 HTTPStatus.NOT_FOUND, "Database not exists", f"Database with name {database_name} does not exist"
299 )
300 all_tables = datanode.get_tables()
301 for t in all_tables:
302 if t.TABLE_NAME == table_name and not replace:
303 return http_error(HTTPStatus.CONFLICT, "Name conflict", f"Table with name {table_name} already exists")
305 try:
306 select_ast = parse_sql(select_query)
307 except ParsingException:
308 return http_error(HTTPStatus.BAD_REQUEST, "Error", f"Could not parse select statement {select_query}")
310 create_ast = CreateTable(f"{database_name}.{table_name}", from_select=select_ast, is_replace=replace)
312 mysql_proxy = FakeMysqlProxy()
314 try:
315 mysql_proxy.process_query(create_ast.get_string())
316 except Exception as e:
317 return http_error(HTTPStatus.BAD_REQUEST, "Error", str(e))
319 all_tables = datanode.get_tables()
320 try:
321 matching_table = next(t for t in all_tables if t.TABLE_NAME == table_name)
322 return _tables_row_to_obj(matching_table), HTTPStatus.CREATED
323 except StopIteration:
324 return http_error(
325 HTTPStatus.INTERNAL_SERVER_ERROR, "Error", f"Table with name {table_name} could not be created"
326 )
329@ns_conf.route("/<database_name>/tables/<table_name>")
330@ns_conf.param("database_name", "Name of the database")
331@ns_conf.param("table_name", "Name of the table")
332class TableResource(Resource):
333 @ns_conf.doc("get_table")
334 @api_endpoint_metrics("GET", "/databases/database/tables/table")
335 def get(self, database_name, table_name):
336 session = SessionController()
337 datanode = session.datahub.get(database_name)
338 all_tables = datanode.get_tables()
339 try:
340 matching_table = next(t for t in all_tables if t.TABLE_NAME == table_name)
341 return _tables_row_to_obj(matching_table)
342 except StopIteration:
343 return http_error(HTTPStatus.NOT_FOUND, "Table not found", f"Table with name {table_name} not found")
345 @ns_conf.doc("drop_table")
346 @api_endpoint_metrics("DELETE", "/databases/database/tables/table")
347 def delete(self, database_name, table_name):
348 session = SessionController()
349 try:
350 session.database_controller.get_project(database_name)
351 error_message = (
352 f"Database {database_name} is a project. "
353 + f"If you want to delete a model or view, use the projects/{database_name}/models/{table_name} or "
354 + f"projects/{database_name}/views/{table_name} endpoints instead."
355 )
356 return http_error(HTTPStatus.BAD_REQUEST, "Error", error_message)
357 except EntityNotExistsError:
358 # Only support dropping tables from integrations.
359 pass
361 datanode = session.datahub.get(database_name)
362 if datanode is None:
363 return http_error(
364 HTTPStatus.NOT_FOUND, "Database not found", f"Database with name {database_name} not found"
365 )
366 all_tables = datanode.get_tables()
367 try:
368 next(t for t in all_tables if t.TABLE_NAME == table_name)
369 except StopIteration:
370 return http_error(HTTPStatus.NOT_FOUND, "Table not found", f"Table with name {table_name} not found")
372 drop_ast = DropTables(tables=[table_name], if_exists=True)
374 try:
375 integration_handler = session.integration_controller.get_data_handler(database_name)
376 except Exception:
377 return http_error(
378 HTTPStatus.INTERNAL_SERVER_ERROR, "Error", f"Could not get database handler for {database_name}"
379 )
380 try:
381 result = integration_handler.query(drop_ast)
382 except NotImplementedError:
383 return http_error(
384 HTTPStatus.BAD_REQUEST, "Error", f"Database {database_name} does not support dropping tables."
385 )
386 if result.type == RESPONSE_TYPE.ERROR: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true
387 return http_error(HTTPStatus.BAD_REQUEST, "Error", result.error_message)
388 return "", HTTPStatus.NO_CONTENT