Coverage for mindsdb / api / http / namespaces / default.py: 26%
78 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from flask import request, session
2from flask_restx import Resource
3from flask_restx import fields
5from mindsdb.__about__ import __version__ as mindsdb_version
6from mindsdb.api.http.namespaces.configs.default import ns_conf
7from mindsdb.api.http.utils import http_error
8from mindsdb.metrics.metrics import api_endpoint_metrics
9from mindsdb.utilities.config import config, HTTP_AUTH_TYPE
10from mindsdb.utilities import log
11from mindsdb.api.common.middleware import generate_pat, revoke_pat, verify_pat
14logger = log.getLogger(__name__)
17def check_session_auth() -> bool:
18 """checking whether current user is authenticated
20 Returns:
21 bool: True if user authentication is approved
22 """
23 try:
24 if config["auth"]["http_auth_enabled"] is False:
25 return True
26 return session.get("username") == config["auth"]["username"]
27 except Exception:
28 return False
31@ns_conf.route("/login", methods=["POST"])
32class LoginRoute(Resource):
33 @ns_conf.doc(
34 responses={200: "Success", 400: "Error in username or password", 401: "Invalid username or password"},
35 body=ns_conf.model(
36 "request_login",
37 {"username": fields.String(description="Username"), "password": fields.String(description="Password")},
38 ),
39 )
40 @api_endpoint_metrics("POST", "/default/login")
41 def post(self):
42 """Check user's credentials and creates a session"""
43 username = request.json.get("username")
44 password = request.json.get("password")
45 if (
46 isinstance(username, str) is False
47 or len(username) == 0
48 or isinstance(password, str) is False
49 or len(password) == 0
50 ):
51 return http_error(400, "Error in username or password", "Username and password should be string")
53 inline_username = config["auth"]["username"]
54 inline_password = config["auth"]["password"]
56 if username != inline_username or password != inline_password:
57 return http_error(401, "Forbidden", "Invalid username or password")
59 logger.info(f"User '{username}' logged in successfully")
61 response = {}
62 if config["auth"]["http_auth_type"] in (HTTP_AUTH_TYPE.SESSION, HTTP_AUTH_TYPE.SESSION_OR_TOKEN):
63 session.clear()
64 session["username"] = username
65 session.permanent = True
67 if config["auth"]["http_auth_type"] in (HTTP_AUTH_TYPE.TOKEN, HTTP_AUTH_TYPE.SESSION_OR_TOKEN):
68 response["token"] = generate_pat()
70 return response, 200
73@ns_conf.route("/logout", methods=["POST"])
74class LogoutRoute(Resource):
75 @ns_conf.doc(responses={200: "Success"})
76 @api_endpoint_metrics("POST", "/default/logout")
77 def post(self):
78 session.clear()
79 # We can't forcibly log out a user with the
80 h = request.headers.get("Authorization")
81 if not h or not h.startswith("Bearer "):
82 bearer = None
83 else:
84 bearer = h.split(" ", 1)[1].strip() or None
85 revoke_pat(bearer)
86 return "", 200
89@ns_conf.route("/status")
90class StatusRoute(Resource):
91 @ns_conf.doc(
92 responses={200: "Success"},
93 model=ns_conf.model(
94 "response_status",
95 {
96 "environment": fields.String(description="The name of current environment: cloud, local or other"),
97 "mindsdb_version": fields.String(description="Current version of mindsdb"),
98 "auth": fields.Nested(
99 ns_conf.model(
100 "response_status_auth",
101 {
102 "confirmed": fields.Boolean(description="is current user authenticated"),
103 "required": fields.Boolean(description="is authenticated required"),
104 "provider": fields.Boolean(description="current authenticated provider: local of 3d-party"),
105 },
106 )
107 ),
108 },
109 ),
110 )
111 @api_endpoint_metrics("GET", "/default/status")
112 def get(self):
113 """returns auth and environment data"""
114 environment = "local"
116 environment = config.get("environment")
117 if environment is None:
118 if config.get("cloud", False):
119 environment = "cloud"
120 elif config.get("aws_marketplace", False):
121 environment = "aws_marketplace"
122 else:
123 environment = "local"
125 auth_provider = "disabled"
126 if config["auth"]["http_auth_enabled"] is True:
127 if config["auth"].get("provider") is not None:
128 auth_provider = config["auth"].get("provider")
129 else:
130 auth_provider = "local"
132 auth_confirmed = False
133 auth_type = config["auth"]["http_auth_type"]
134 if auth_type in (HTTP_AUTH_TYPE.SESSION, HTTP_AUTH_TYPE.SESSION_OR_TOKEN):
135 auth_confirmed = auth_confirmed or check_session_auth()
136 if auth_type in (HTTP_AUTH_TYPE.TOKEN, HTTP_AUTH_TYPE.SESSION_OR_TOKEN):
137 auth_confirmed = auth_confirmed or verify_pat(
138 request.headers.get("Authorization", "").replace("Bearer ", "")
139 )
141 resp = {
142 "mindsdb_version": mindsdb_version,
143 "environment": environment,
144 "auth": {
145 "confirmed": auth_confirmed,
146 "http_auth_enabled": config["auth"]["http_auth_enabled"],
147 "provider": auth_provider,
148 },
149 }
151 return resp