Coverage for mindsdb / api / http / namespaces / config.py: 22%

180 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import copy 

2import shutil 

3import tempfile 

4from pathlib import Path 

5from http import HTTPStatus 

6 

7from flask import request 

8from flask_restx import Resource 

9from flask import current_app as ca 

10 

11from mindsdb.api.http.namespaces.configs.config import ns_conf 

12from mindsdb.api.http.utils import http_error 

13from mindsdb.metrics.metrics import api_endpoint_metrics 

14from mindsdb.utilities.api_status import get_api_status 

15from mindsdb.utilities import log 

16from mindsdb.utilities.functions import decrypt, encrypt 

17from mindsdb.utilities.config import Config 

18from mindsdb.integrations.libs.response import HandlerStatusResponse 

19 

20 

21logger = log.getLogger(__name__) 

22 

23 

24@ns_conf.route("/") 

25@ns_conf.param("name", "Get config") 

26class GetConfig(Resource): 

27 @ns_conf.doc("get_config") 

28 @api_endpoint_metrics("GET", "/config") 

29 def get(self): 

30 config = Config() 

31 resp = {"auth": {"http_auth_enabled": config["auth"]["http_auth_enabled"]}} 

32 for key in ["default_llm", "default_embedding_model", "default_reranking_model"]: 

33 value = config.get(key) 

34 if value is not None: 

35 resp[key] = value 

36 

37 api_status = get_api_status() 

38 api_configs = copy.deepcopy(config["api"]) 

39 for api_name, api_config in api_configs.items(): 

40 api_config["running"] = api_status.get(api_name, False) 

41 resp["api"] = api_configs 

42 

43 return resp 

44 

45 @ns_conf.doc("put_config") 

46 @api_endpoint_metrics("PUT", "/config") 

47 def put(self): 

48 data = request.json 

49 

50 allowed_arguments = {"auth", "default_llm", "default_embedding_model", "default_reranking_model"} 

51 unknown_arguments = list(set(data.keys()) - allowed_arguments) 

52 if len(unknown_arguments) > 0: 

53 return http_error(HTTPStatus.BAD_REQUEST, "Wrong arguments", f"Unknown argumens: {unknown_arguments}") 

54 

55 nested_keys_to_validate = {"auth"} 

56 for key in data.keys(): 

57 if key in nested_keys_to_validate: 

58 unknown_arguments = list(set(data[key].keys()) - set(Config()[key].keys())) 

59 if len(unknown_arguments) > 0: 

60 return http_error( 

61 HTTPStatus.BAD_REQUEST, "Wrong arguments", f"Unknown argumens: {unknown_arguments}" 

62 ) 

63 

64 overwrite_arguments = {"default_llm", "default_embedding_model", "default_reranking_model"} 

65 overwrite_data = {k: data[k] for k in overwrite_arguments if k in data} 

66 merge_data = {k: data[k] for k in data if k not in overwrite_arguments} 

67 

68 if len(overwrite_data) > 0: 

69 Config().update(overwrite_data, overwrite=True) 

70 if len(merge_data) > 0: 

71 Config().update(merge_data) 

72 

73 Config().update(data) 

74 

75 return "", 200 

76 

77 

78@ns_conf.route("/integrations") 

79@ns_conf.param("name", "List all database integration") 

80class ListIntegration(Resource): 

81 @api_endpoint_metrics("GET", "/config/integrations") 

82 def get(self): 

83 return {"integrations": [k for k in ca.integration_controller.get_all(show_secrets=False)]} 

84 

85 

86@ns_conf.route("/all_integrations") 

87@ns_conf.param("name", "List all database integration") 

88class AllIntegration(Resource): 

89 @ns_conf.doc("get_all_integrations") 

90 @api_endpoint_metrics("GET", "/config/all_integrations") 

91 def get(self): 

92 integrations = ca.integration_controller.get_all(show_secrets=False) 

93 return integrations 

94 

95 

96@ns_conf.route("/integrations/<name>") 

97@ns_conf.param("name", "Database integration") 

98class Integration(Resource): 

99 @ns_conf.doc("get_integration") 

100 @api_endpoint_metrics("GET", "/config/integrations/integration") 

101 def get(self, name): 

102 integration = ca.integration_controller.get(name, show_secrets=False) 

103 if integration is None: 

104 return http_error(HTTPStatus.NOT_FOUND, "Not found", f"Can't find integration: {name}") 

105 integration = copy.deepcopy(integration) 

106 return integration 

107 

108 @ns_conf.doc("put_integration") 

109 @api_endpoint_metrics("PUT", "/config/integrations/integration") 

110 def put(self, name): 

111 params = {} 

112 if request.is_json: 

113 params.update((request.json or {}).get("params", {})) 

114 else: 

115 params.update(request.form or {}) 

116 

117 if len(params) == 0: 

118 return http_error(HTTPStatus.BAD_REQUEST, "Wrong argument", "type of 'params' must be dict") 

119 

120 files = request.files 

121 temp_dir = None 

122 if files is not None and len(files) > 0: 

123 temp_dir = tempfile.mkdtemp(prefix="integration_files_") 

124 for key, file in files.items(): 

125 temp_dir_path = Path(temp_dir) 

126 file_name = Path(file.filename) 

127 file_path = temp_dir_path.joinpath(file_name).resolve() 

128 if temp_dir_path not in file_path.parents: 

129 raise Exception(f"Can not save file at path: {file_path}") 

130 file.save(file_path) 

131 params[key] = str(file_path) 

132 

133 is_test = params.get("test", False) 

134 # TODO: Move this to new Endpoint 

135 

136 config = Config() 

137 secret_key = config.get("secret_key", "dummy-key") 

138 

139 if is_test: 

140 del params["test"] 

141 handler_type = params.pop("type", None) 

142 params.pop("publish", None) 

143 try: 

144 handler = ca.integration_controller.create_tmp_handler(name, handler_type, params) 

145 status = handler.check_connection() 

146 except ImportError as e: 

147 status = HandlerStatusResponse(success=False, error_message=str(e)) 

148 if temp_dir is not None: 

149 shutil.rmtree(temp_dir) 

150 

151 resp = status.to_json() 

152 

153 if status.success and "code" in params: 

154 if hasattr(handler, "handler_storage"): 

155 # attach storage if exists 

156 export = handler.handler_storage.export_files() 

157 if export: 

158 # encrypt with flask secret key 

159 encrypted = encrypt(export, secret_key) 

160 resp["storage"] = encrypted.decode() 

161 

162 return resp, 200 

163 

164 config = Config() 

165 secret_key = config.get("secret_key", "dummy-key") 

166 

167 integration = ca.integration_controller.get(name, show_secrets=False) 

168 if integration is not None: 

169 return http_error( 

170 HTTPStatus.BAD_REQUEST, "Wrong argument", f"Integration with name '{name}' already exists" 

171 ) 

172 

173 try: 

174 engine = params.pop("type", None) 

175 params.pop("publish", False) 

176 storage = params.pop("storage", None) 

177 ca.integration_controller.add(name, engine, params) 

178 

179 # copy storage 

180 if storage is not None: 

181 handler = ca.integration_controller.get_data_handler(name) 

182 

183 export = decrypt(storage.encode(), secret_key) 

184 handler.handler_storage.import_files(export) 

185 

186 except Exception as e: 

187 logger.exception("An error occurred during the creation of the integration:") 

188 if temp_dir is not None: 

189 shutil.rmtree(temp_dir) 

190 return http_error(HTTPStatus.INTERNAL_SERVER_ERROR, "Error", f"Error during config update: {e}") 

191 

192 if temp_dir is not None: 

193 shutil.rmtree(temp_dir) 

194 return {}, 200 

195 

196 @ns_conf.doc("delete_integration") 

197 @api_endpoint_metrics("DELETE", "/config/integrations/integration") 

198 def delete(self, name): 

199 integration = ca.integration_controller.get(name) 

200 if integration is None: 

201 return http_error( 

202 HTTPStatus.BAD_REQUEST, "Integration does not exists", f"Nothing to delete. '{name}' not exists." 

203 ) 

204 try: 

205 ca.integration_controller.delete(name) 

206 except Exception as e: 

207 logger.exception("An error occurred while deleting the integration") 

208 return http_error(HTTPStatus.INTERNAL_SERVER_ERROR, "Error", f"Error during integration delete: {e}") 

209 return "", 200 

210 

211 @ns_conf.doc("modify_integration") 

212 @api_endpoint_metrics("POST", "/config/integrations/integration") 

213 def post(self, name): 

214 params = {} 

215 params.update((request.json or {}).get("params", {})) 

216 params.update(request.form or {}) 

217 

218 if not isinstance(params, dict): 

219 return http_error(HTTPStatus.BAD_REQUEST, "Wrong argument", "type of 'params' must be dict") 

220 integration = ca.integration_controller.get(name) 

221 if integration is None: 

222 return http_error( 

223 HTTPStatus.BAD_REQUEST, "Integration does not exists", f"Nothin to modify. '{name}' not exists." 

224 ) 

225 try: 

226 if "enabled" in params: 

227 params["publish"] = params["enabled"] 

228 del params["enabled"] 

229 ca.integration_controller.modify(name, params) 

230 

231 except Exception as e: 

232 logger.exception("An error occurred while modifying the integration") 

233 return http_error(HTTPStatus.INTERNAL_SERVER_ERROR, "Error", f"Error during integration modification: {e}") 

234 return "", 200