Coverage for mindsdb / api / http / namespaces / handlers.py: 47%

205 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2import tempfile 

3import importlib 

4import multipart 

5from pathlib import Path 

6from http import HTTPStatus 

7 

8from flask import request, send_file, current_app as ca 

9from flask_restx import Resource 

10 

11from mindsdb_sql_parser.ast import Identifier 

12from mindsdb_sql_parser.ast.mindsdb import CreateMLEngine 

13 

14from mindsdb.metrics.metrics import api_endpoint_metrics 

15from mindsdb.integrations.libs.ml_exec_base import process_cache 

16from mindsdb.integrations.utilities.install import install_dependencies 

17from mindsdb.interfaces.storage.model_fs import HandlerStorage 

18from mindsdb.api.http.utils import http_error 

19from mindsdb.api.http.namespaces.configs.handlers import ns_conf 

20from mindsdb.api.executor.controllers.session_controller import SessionController 

21from mindsdb.api.executor.command_executor import ExecuteCommands 

22from mindsdb.utilities.exception import EntityExistsError 

23from mindsdb.utilities import log 

24 

25logger = log.getLogger(__name__) 

26 

27 

28def _resolve_handler_readme_path(handler_folder: str) -> Path: 

29 handler_folder_name = Path(handler_folder).name 

30 if handler_folder_name != handler_folder or ".." in handler_folder: 

31 raise ValueError(f"Handler folder '{handler_folder}' is invalid.") 

32 

33 mindsdb_path = Path(importlib.util.find_spec("mindsdb").origin).parent 

34 base_handlers_path = mindsdb_path.joinpath("integrations/handlers").resolve() 

35 readme_path = base_handlers_path.joinpath(handler_folder_name).joinpath("README.md").resolve() 

36 

37 if base_handlers_path not in readme_path.parents: 

38 raise ValueError(f"Handler folder '{handler_folder}' is invalid.") 

39 

40 return readme_path 

41 

42 

43@ns_conf.route("/") 

44class HandlersList(Resource): 

45 @ns_conf.doc("handlers_list") 

46 @api_endpoint_metrics("GET", "/handlers") 

47 def get(self): 

48 """List all db handlers""" 

49 

50 if request.args.get("lazy") == "1": 

51 handlers = ca.integration_controller.get_handlers_metadata() 

52 else: 

53 handlers = ca.integration_controller.get_handlers_import_status() 

54 result = [] 

55 for handler_type, handler_meta in handlers.items(): 

56 # remove non-integration handlers 

57 if handler_type not in ["utilities", "dummy_data"]: 

58 row = {"name": handler_type} 

59 row.update(handler_meta) 

60 del row["path"] 

61 result.append(row) 

62 return result 

63 

64 

65@ns_conf.route("/<handler_name>/icon") 

66class HandlerIcon(Resource): 

67 @ns_conf.param("handler_name", "Handler name") 

68 @api_endpoint_metrics("GET", "/handlers/handler/icon") 

69 def get(self, handler_name): 

70 try: 

71 handler_meta = ca.integration_controller.get_handlers_metadata().get(handler_name) 

72 if handler_meta is None: 

73 return http_error(HTTPStatus.NOT_FOUND, "Icon not found", f"Icon for {handler_name} not found") 

74 icon_name = handler_meta["icon"]["name"] 

75 handler_folder = handler_meta["import"]["folder"] 

76 mindsdb_path = Path(importlib.util.find_spec("mindsdb").origin).parent 

77 icon_path = mindsdb_path.joinpath("integrations/handlers").joinpath(handler_folder).joinpath(icon_name) 

78 if icon_path.is_absolute() is False: 

79 icon_path = Path(os.getcwd()).joinpath(icon_path) 

80 except Exception: 

81 error_message = f"Icon for '{handler_name}' not found" 

82 logger.warning(error_message) 

83 return http_error(HTTPStatus.NOT_FOUND, "Icon not found", error_message) 

84 else: 

85 return send_file(icon_path) 

86 

87 

88@ns_conf.route("/<handler_name>") 

89class HandlerInfo(Resource): 

90 @ns_conf.param("handler_name", "Handler name") 

91 @api_endpoint_metrics("GET", "/handlers/handler") 

92 def get(self, handler_name): 

93 handler_meta = ca.integration_controller.get_handler_meta(handler_name) 

94 row = {"name": handler_name} 

95 row.update(handler_meta) 

96 del row["path"] 

97 del row["icon"] 

98 return row 

99 

100 

101@ns_conf.route("/<handler_name>/readme") 

102class HandlerReadme(Resource): 

103 @ns_conf.param("handler_name", "Handler name") 

104 @api_endpoint_metrics("GET", "/handlers/handler/readme") 

105 def get(self, handler_name): 

106 try: 

107 handler_meta = ca.integration_controller.get_handler_meta(handler_name) 

108 except Exception: 

109 return http_error( 

110 HTTPStatus.NOT_FOUND, 

111 "Readme not found", 

112 f"Handler '{handler_name}' not found", 

113 ) 

114 

115 def make_response(*, error_message=None, readme=None): 

116 return {"name": handler_name, "readme": readme, "error_message": error_message} 

117 

118 if handler_meta is None: 

119 error_message = f"Handler '{handler_name}' not found" 

120 logger.warning(error_message) 

121 return make_response(error_message=error_message) 

122 

123 handler_folder = handler_meta.get("import", {}).get("folder") 

124 if handler_folder is None: 

125 error_message = f"Handler '{handler_name}' does not define a folder" 

126 logger.warning(error_message) 

127 return make_response(error_message=error_message) 

128 

129 try: 

130 readme_path = _resolve_handler_readme_path(handler_folder) 

131 except ValueError as exc: 

132 error_message = str(exc) 

133 logger.warning(error_message) 

134 return make_response(error_message=error_message) 

135 

136 try: 

137 with open(readme_path, "r", encoding="utf-8") as readme_file: 

138 readme_content = readme_file.read() 

139 except FileNotFoundError: 

140 error_message = f"README.md for handler '{handler_name}' not found" 

141 logger.warning(error_message) 

142 return make_response(error_message=error_message) 

143 

144 return make_response(readme=readme_content) 

145 

146 

147@ns_conf.route("/<handler_name>/install") 

148class InstallDependencies(Resource): 

149 @ns_conf.param("handler_name", "Handler name") 

150 @api_endpoint_metrics("POST", "/handlers/handler/install") 

151 def post(self, handler_name): 

152 handler_meta = ca.integration_controller.get_handler_meta(handler_name) 

153 

154 if handler_meta is None: 

155 return f"Unknown handler: {handler_name}", 400 

156 

157 if handler_meta.get("import", {}).get("success", False) is True: 

158 return "Installed", 200 

159 

160 dependencies = handler_meta["import"]["dependencies"] 

161 if len(dependencies) == 0: 

162 return "Installed", 200 

163 

164 result = install_dependencies(dependencies) 

165 

166 # reload it if any result, so we can get new error message 

167 ca.integration_controller.reload_handler_module(handler_name) 

168 if result.get("success") is True: 

169 # If warm processes are available in the cache, remove them. 

170 # This will force a new process to be created with the installed dependencies. 

171 process_cache.remove_processes_for_handler(handler_name) 

172 return "", 200 

173 return http_error( 

174 500, 

175 f"Failed to install dependencies for {handler_meta.get('title', handler_name)}", 

176 result.get("error_message", "unknown error"), 

177 ) 

178 

179 

180def prepare_formdata(): 

181 params = {} 

182 file_names = [] 

183 

184 def on_field(field): 

185 name = field.field_name.decode() 

186 value = field.value.decode() 

187 params[name] = value 

188 

189 def on_file(file): 

190 file_name = file.file_name.decode() 

191 if Path(file_name).name != file_name: 

192 raise ValueError(f"Wrong file name: {file_name}") 

193 

194 field_name = file.field_name.decode() 

195 if field_name not in ("code", "modules"): 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true

196 raise ValueError(f"Wrong field name: {field_name}") 

197 

198 params[field_name] = file.file_object 

199 file_names.append(field_name) 

200 

201 temp_dir_path = tempfile.mkdtemp(prefix="mindsdb_file_") 

202 

203 parser = multipart.create_form_parser( 

204 headers=request.headers, 

205 on_field=on_field, 

206 on_file=on_file, 

207 config={ 

208 "UPLOAD_DIR": temp_dir_path.encode(), # bytes required 

209 "UPLOAD_KEEP_FILENAME": True, 

210 "UPLOAD_KEEP_EXTENSIONS": True, 

211 "MAX_MEMORY_FILE_SIZE": float("inf"), 

212 }, 

213 ) 

214 

215 while True: 

216 chunk = request.stream.read(8192) 

217 if not chunk: 

218 break 

219 parser.write(chunk) 

220 parser.finalize() 

221 parser.close() 

222 

223 for file_name in file_names: 

224 file_path = os.path.join(temp_dir_path, file_name) 

225 with open(file_path, "wb") as f: 

226 params[file_name].seek(0) 

227 f.write(params[file_name].read()) 

228 params[file_name].close() 

229 params[file_name] = file_path 

230 

231 return params 

232 

233 

234@ns_conf.route("/byom/<name>") 

235@ns_conf.param("name", "Name of the model") 

236class BYOMUpload(Resource): 

237 @ns_conf.doc("post_file") 

238 @api_endpoint_metrics("POST", "/handlers/byom/handler") 

239 def post(self, name): 

240 params = prepare_formdata() 

241 

242 code_file_path = params["code"] 

243 try: 

244 module_file_path = params["modules"] 

245 except KeyError: 

246 module_file_path = Path(code_file_path).parent / "requirements.txt" 

247 module_file_path.touch() 

248 module_file_path = str(module_file_path) 

249 

250 connection_args = {"code": code_file_path, "modules": module_file_path, "type": params.get("type")} 

251 

252 session = SessionController() 

253 

254 base_ml_handler = session.integration_controller.get_ml_handler(name) 

255 base_ml_handler.update_engine(connection_args) 

256 

257 engine_storage = HandlerStorage(base_ml_handler.integration_id) 

258 

259 engine_versions = [int(x) for x in engine_storage.get_connection_args()["versions"].keys()] 

260 

261 return {"last_engine_version": max(engine_versions), "engine_versions": engine_versions} 

262 

263 @ns_conf.doc("put_file") 

264 @api_endpoint_metrics("PUT", "/handlers/byom/handler") 

265 def put(self, name): 

266 """upload new model 

267 params in FormData: 

268 - code 

269 - modules 

270 """ 

271 

272 params = prepare_formdata() 

273 

274 code_file_path = params["code"] 

275 try: 

276 module_file_path = params["modules"] 

277 except KeyError: 

278 module_file_path = Path(code_file_path).parent / "requirements.txt" 

279 module_file_path.touch() 

280 module_file_path = str(module_file_path) 

281 

282 connection_args = { 

283 "code": code_file_path, 

284 "modules": module_file_path, 

285 "mode": params.get("mode"), 

286 "type": params.get("type"), 

287 } 

288 

289 ast_query = CreateMLEngine(name=Identifier(name), handler="byom", params=connection_args) 

290 sql_session = SessionController() 

291 command_executor = ExecuteCommands(sql_session) 

292 try: 

293 command_executor.execute_command(ast_query) 

294 except EntityExistsError: 

295 return http_error( 

296 HTTPStatus.CONFLICT, 

297 "Engine already exists", 

298 f'Engine "{name}" already exists', 

299 ) 

300 

301 return "", 200