Coverage for mindsdb / api / http / namespaces / default.py: 26%

78 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from flask import request, session 

2from flask_restx import Resource 

3from flask_restx import fields 

4 

5from mindsdb.__about__ import __version__ as mindsdb_version 

6from mindsdb.api.http.namespaces.configs.default import ns_conf 

7from mindsdb.api.http.utils import http_error 

8from mindsdb.metrics.metrics import api_endpoint_metrics 

9from mindsdb.utilities.config import config, HTTP_AUTH_TYPE 

10from mindsdb.utilities import log 

11from mindsdb.api.common.middleware import generate_pat, revoke_pat, verify_pat 

12 

13 

14logger = log.getLogger(__name__) 

15 

16 

17def check_session_auth() -> bool: 

18 """checking whether current user is authenticated 

19 

20 Returns: 

21 bool: True if user authentication is approved 

22 """ 

23 try: 

24 if config["auth"]["http_auth_enabled"] is False: 

25 return True 

26 return session.get("username") == config["auth"]["username"] 

27 except Exception: 

28 return False 

29 

30 

31@ns_conf.route("/login", methods=["POST"]) 

32class LoginRoute(Resource): 

33 @ns_conf.doc( 

34 responses={200: "Success", 400: "Error in username or password", 401: "Invalid username or password"}, 

35 body=ns_conf.model( 

36 "request_login", 

37 {"username": fields.String(description="Username"), "password": fields.String(description="Password")}, 

38 ), 

39 ) 

40 @api_endpoint_metrics("POST", "/default/login") 

41 def post(self): 

42 """Check user's credentials and creates a session""" 

43 username = request.json.get("username") 

44 password = request.json.get("password") 

45 if ( 

46 isinstance(username, str) is False 

47 or len(username) == 0 

48 or isinstance(password, str) is False 

49 or len(password) == 0 

50 ): 

51 return http_error(400, "Error in username or password", "Username and password should be string") 

52 

53 inline_username = config["auth"]["username"] 

54 inline_password = config["auth"]["password"] 

55 

56 if username != inline_username or password != inline_password: 

57 return http_error(401, "Forbidden", "Invalid username or password") 

58 

59 logger.info(f"User '{username}' logged in successfully") 

60 

61 response = {} 

62 if config["auth"]["http_auth_type"] in (HTTP_AUTH_TYPE.SESSION, HTTP_AUTH_TYPE.SESSION_OR_TOKEN): 

63 session.clear() 

64 session["username"] = username 

65 session.permanent = True 

66 

67 if config["auth"]["http_auth_type"] in (HTTP_AUTH_TYPE.TOKEN, HTTP_AUTH_TYPE.SESSION_OR_TOKEN): 

68 response["token"] = generate_pat() 

69 

70 return response, 200 

71 

72 

73@ns_conf.route("/logout", methods=["POST"]) 

74class LogoutRoute(Resource): 

75 @ns_conf.doc(responses={200: "Success"}) 

76 @api_endpoint_metrics("POST", "/default/logout") 

77 def post(self): 

78 session.clear() 

79 # We can't forcibly log out a user with the 

80 h = request.headers.get("Authorization") 

81 if not h or not h.startswith("Bearer "): 

82 bearer = None 

83 else: 

84 bearer = h.split(" ", 1)[1].strip() or None 

85 revoke_pat(bearer) 

86 return "", 200 

87 

88 

89@ns_conf.route("/status") 

90class StatusRoute(Resource): 

91 @ns_conf.doc( 

92 responses={200: "Success"}, 

93 model=ns_conf.model( 

94 "response_status", 

95 { 

96 "environment": fields.String(description="The name of current environment: cloud, local or other"), 

97 "mindsdb_version": fields.String(description="Current version of mindsdb"), 

98 "auth": fields.Nested( 

99 ns_conf.model( 

100 "response_status_auth", 

101 { 

102 "confirmed": fields.Boolean(description="is current user authenticated"), 

103 "required": fields.Boolean(description="is authenticated required"), 

104 "provider": fields.Boolean(description="current authenticated provider: local of 3d-party"), 

105 }, 

106 ) 

107 ), 

108 }, 

109 ), 

110 ) 

111 @api_endpoint_metrics("GET", "/default/status") 

112 def get(self): 

113 """returns auth and environment data""" 

114 environment = "local" 

115 

116 environment = config.get("environment") 

117 if environment is None: 

118 if config.get("cloud", False): 

119 environment = "cloud" 

120 elif config.get("aws_marketplace", False): 

121 environment = "aws_marketplace" 

122 else: 

123 environment = "local" 

124 

125 auth_provider = "disabled" 

126 if config["auth"]["http_auth_enabled"] is True: 

127 if config["auth"].get("provider") is not None: 

128 auth_provider = config["auth"].get("provider") 

129 else: 

130 auth_provider = "local" 

131 

132 auth_confirmed = False 

133 auth_type = config["auth"]["http_auth_type"] 

134 if auth_type in (HTTP_AUTH_TYPE.SESSION, HTTP_AUTH_TYPE.SESSION_OR_TOKEN): 

135 auth_confirmed = auth_confirmed or check_session_auth() 

136 if auth_type in (HTTP_AUTH_TYPE.TOKEN, HTTP_AUTH_TYPE.SESSION_OR_TOKEN): 

137 auth_confirmed = auth_confirmed or verify_pat( 

138 request.headers.get("Authorization", "").replace("Bearer ", "") 

139 ) 

140 

141 resp = { 

142 "mindsdb_version": mindsdb_version, 

143 "environment": environment, 

144 "auth": { 

145 "confirmed": auth_confirmed, 

146 "http_auth_enabled": config["auth"]["http_auth_enabled"], 

147 "provider": auth_provider, 

148 }, 

149 } 

150 

151 return resp