Coverage for mindsdb / api / http / namespaces / knowledge_bases.py: 43%

210 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from http import HTTPStatus 

2 

3from flask import request 

4from flask_restx import Resource 

5from mindsdb_sql_parser.ast import Identifier 

6 

7from mindsdb.api.http.namespaces.configs.projects import ns_conf 

8from mindsdb.api.executor.controllers.session_controller import SessionController 

9from mindsdb.api.executor.exceptions import ExecutorException 

10from mindsdb.api.http.utils import http_error 

11 

12from mindsdb.api.mysql.mysql_proxy.classes.fake_mysql_proxy import FakeMysqlProxy 

13from mindsdb.integrations.utilities.rag.splitters.file_splitter import FileSplitter, FileSplitterConfig 

14from mindsdb.interfaces.file.file_controller import FileController 

15from mindsdb.interfaces.knowledge_base.preprocessing.constants import ( 

16 DEFAULT_CONTEXT_DOCUMENT_LIMIT, 

17 DEFAULT_CRAWL_DEPTH, 

18 DEFAULT_WEB_FILTERS, 

19 DEFAULT_WEB_CRAWL_LIMIT, 

20) 

21from mindsdb.interfaces.knowledge_base.preprocessing.document_loader import DocumentLoader 

22from mindsdb.metrics.metrics import api_endpoint_metrics 

23from mindsdb.interfaces.database.projects import ProjectController 

24from mindsdb.interfaces.knowledge_base.controller import KnowledgeBaseTable 

25from mindsdb.utilities import log 

26from mindsdb.utilities.exception import EntityNotExistsError, EntityExistsError 

27from mindsdb.integrations.utilities.rag.settings import DEFAULT_LLM_MODEL, DEFAULT_RAG_PROMPT_TEMPLATE 

28 

29 

30logger = log.getLogger(__name__) 

31 

32 

33@ns_conf.route("/<project_name>/knowledge_bases") 

34class KnowledgeBasesResource(Resource): 

35 @ns_conf.doc("list_knowledge_bases") 

36 @api_endpoint_metrics("GET", "/knowledge_bases") 

37 def get(self, project_name): 

38 """List all knowledge bases""" 

39 session = SessionController() 

40 project_controller = ProjectController() 

41 try: 

42 _ = project_controller.get(name=project_name) 

43 except EntityNotExistsError: 

44 # Project must exist. 

45 return http_error( 

46 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist" 

47 ) 

48 

49 # KB Controller already returns dict. 

50 return session.kb_controller.list(project_name) 

51 

52 @ns_conf.doc("create_knowledge_base") 

53 @api_endpoint_metrics("POST", "/knowledge_bases") 

54 def post(self, project_name): 

55 """Create a knowledge base""" 

56 

57 # Check for required parameters. 

58 if "knowledge_base" not in request.json: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 return http_error( 

60 HTTPStatus.BAD_REQUEST, "Missing parameter", 'Must provide "knowledge_base" parameter in POST body' 

61 ) 

62 

63 knowledge_base = request.json["knowledge_base"] 

64 # Explicitly require embedding model & vector database. 

65 required_fields = ["name"] 

66 for field in required_fields: 

67 if field not in knowledge_base: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true

68 return http_error( 

69 HTTPStatus.BAD_REQUEST, "Missing parameter", f'Must provide "{field}" field in "knowledge_base"' 

70 ) 

71 if "storage" in knowledge_base: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true

72 if "database" not in knowledge_base["storage"] or "table" not in knowledge_base["storage"]: 

73 return http_error( 

74 HTTPStatus.BAD_REQUEST, 

75 "Missing parameter", 

76 'Must provide "database" and "table" field in "storage" param', 

77 ) 

78 

79 session = SessionController() 

80 project_controller = ProjectController() 

81 try: 

82 project = project_controller.get(name=project_name) 

83 except EntityNotExistsError: 

84 # Project must exist. 

85 return http_error( 

86 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist" 

87 ) 

88 

89 kb_name = knowledge_base.get("name") 

90 existing_kb = session.kb_controller.get(kb_name, project.id) 

91 if existing_kb is not None: 91 ↛ 93line 91 didn't jump to line 93 because the condition on line 91 was never true

92 # Knowledge Base must not exist. 

93 return http_error( 

94 HTTPStatus.CONFLICT, 

95 "Knowledge Base already exists", 

96 f"Knowledge Base with name {kb_name} already exists", 

97 ) 

98 

99 # Legacy: Support for embedding model identifier. 

100 # embedding_model_identifier = None 

101 # if knowledge_base.get('model'): 

102 # embedding_model_identifier = Identifier(parts=[knowledge_base['model']]) 

103 

104 storage = knowledge_base.get("storage") 

105 embedding_table_identifier = None 

106 if storage is not None: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true

107 embedding_table_identifier = Identifier(parts=[storage["database"], storage["table"]]) 

108 

109 params = knowledge_base.get("params", {}) 

110 

111 optional_parameter_fields = [ 

112 "embedding_model", 

113 "reranking_model", 

114 "content_columns", 

115 "metadata_columns", 

116 "id_column", 

117 ] 

118 

119 for field in optional_parameter_fields: 

120 if field in knowledge_base: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true

121 params[field] = knowledge_base[field] 

122 

123 try: 

124 new_kb = session.kb_controller.add( 

125 kb_name, 

126 project.name, 

127 embedding_table_identifier, 

128 params=params, 

129 preprocessing_config=knowledge_base.get("preprocessing"), 

130 ) 

131 except ValueError as e: 

132 return http_error(HTTPStatus.BAD_REQUEST, "Invalid preprocessing configuration", str(e)) 

133 except EntityExistsError as e: 

134 return http_error(HTTPStatus.BAD_REQUEST, "Knowledge base already exists", str(e)) 

135 

136 return new_kb.as_dict(session.show_secrets), HTTPStatus.CREATED 

137 

138 

139@ns_conf.route("/<project_name>/knowledge_bases/<knowledge_base_name>") 

140@ns_conf.param("project_name", "Name of the project") 

141@ns_conf.param("knowledge_base_name", "Name of the knowledge_base") 

142class KnowledgeBaseResource(Resource): 

143 @ns_conf.doc("get_knowledge_base") 

144 @api_endpoint_metrics("GET", "/knowledge_bases/knowledge_base") 

145 def get(self, project_name, knowledge_base_name): 

146 """Gets a knowledge base by name""" 

147 session = SessionController() 

148 project_controller = ProjectController() 

149 try: 

150 project = project_controller.get(name=project_name) 

151 except EntityNotExistsError: 

152 # Project must exist. 

153 return http_error( 

154 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist" 

155 ) 

156 

157 existing_kb = session.kb_controller.get(knowledge_base_name, project.id) 

158 if existing_kb is None: 

159 return http_error( 

160 HTTPStatus.NOT_FOUND, 

161 "Knowledge Base not found", 

162 f"Knowledge Base with name {knowledge_base_name} does not exist", 

163 ) 

164 return existing_kb.as_dict(session.show_secrets), HTTPStatus.OK 

165 

166 @ns_conf.doc("update_knowledge_base") 

167 @api_endpoint_metrics("PUT", "/knowledge_bases/knowledge_base") 

168 def put(self, project_name: str, knowledge_base_name: str): 

169 """Updates a knowledge base with optional preprocessing.""" 

170 

171 # Check for required parameters 

172 if "knowledge_base" not in request.json: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 return http_error( 

174 HTTPStatus.BAD_REQUEST, "Missing parameter", 'Must provide "knowledge_base" parameter in PUT body' 

175 ) 

176 

177 session = SessionController() 

178 project_controller = ProjectController() 

179 

180 try: 

181 project = project_controller.get(name=project_name) 

182 except EntityNotExistsError: 

183 return http_error( 

184 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist" 

185 ) 

186 

187 try: 

188 kb_data = request.json["knowledge_base"] 

189 

190 # Retrieve the knowledge base table for updates 

191 table = session.kb_controller.get_table(knowledge_base_name, project.id, params=kb_data.get("params")) 

192 if table is None: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true

193 return http_error( 

194 HTTPStatus.NOT_FOUND, 

195 "Knowledge Base not found", 

196 f"Knowledge Base with name {knowledge_base_name} does not exist", 

197 ) 

198 

199 # Set up dependencies for DocumentLoader 

200 file_controller = FileController() 

201 file_splitter_config = FileSplitterConfig() 

202 file_splitter = FileSplitter(file_splitter_config) 

203 mysql_proxy = FakeMysqlProxy() 

204 

205 # Initialize DocumentLoader with required components 

206 document_loader = DocumentLoader( 

207 file_controller=file_controller, 

208 file_splitter=file_splitter, 

209 mysql_proxy=mysql_proxy, 

210 ) 

211 

212 # Configure table with dependencies 

213 table.document_loader = document_loader 

214 

215 # Update preprocessing configuration if provided 

216 if "preprocessing" in kb_data: 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true

217 table.configure_preprocessing(kb_data["preprocessing"]) 

218 

219 # Process raw data rows if provided 

220 if kb_data.get("rows"): 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true

221 table.insert_rows(kb_data["rows"]) 

222 

223 # Process files if specified 

224 if kb_data.get("files"): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 table.insert_files(kb_data["files"]) 

226 

227 # Process web pages if URLs provided 

228 if kb_data.get("urls"): 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true

229 table.insert_web_pages( 

230 urls=kb_data["urls"], 

231 limit=kb_data.get("limit") or DEFAULT_WEB_CRAWL_LIMIT, 

232 crawl_depth=kb_data.get("crawl_depth", DEFAULT_CRAWL_DEPTH), 

233 filters=kb_data.get("filters", DEFAULT_WEB_FILTERS), 

234 ) 

235 

236 # Process query if provided 

237 if kb_data.get("query"): 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true

238 table.insert_query_result(kb_data["query"], project_name) 

239 

240 # update KB 

241 update_kb_data = {} 

242 if "params" in kb_data: 242 ↛ 252line 242 didn't jump to line 252 because the condition on line 242 was always true

243 allowed_keys = [ 

244 "id_column", 

245 "metadata_columns", 

246 "content_columns", 

247 "preprocessing", 

248 "reranking_model", 

249 "embedding_model", 

250 ] 

251 update_kb_data = {k: v for k, v in kb_data["params"].items() if k in allowed_keys} 

252 if update_kb_data or "preprocessing" in kb_data: 252 ↛ 274line 252 didn't jump to line 274 because the condition on line 252 was always true

253 session.kb_controller.update( 

254 knowledge_base_name, 

255 project.name, 

256 params=update_kb_data, 

257 preprocessing_config=kb_data.get("preprocessing"), 

258 ) 

259 

260 except ExecutorException as e: 

261 logger.exception("Error during preprocessing and insertion:") 

262 return http_error( 

263 HTTPStatus.BAD_REQUEST, 

264 "Invalid SELECT query", 

265 f'Executing "query" failed. Needs to be a valid SELECT statement that returns data: {e}', 

266 ) 

267 

268 except Exception as e: 

269 logger.exception("Error during preprocessing and insertion:") 

270 return http_error( 

271 HTTPStatus.BAD_REQUEST, "Preprocessing Error", f"Error during preprocessing and insertion: {e}" 

272 ) 

273 

274 return "", HTTPStatus.OK 

275 

276 @ns_conf.doc("delete_knowledge_base") 

277 @api_endpoint_metrics("DELETE", "/knowledge_bases/knowledge_base") 

278 def delete(self, project_name: str, knowledge_base_name: str): 

279 """Deletes a knowledge base.""" 

280 project_controller = ProjectController() 

281 try: 

282 project = project_controller.get(name=project_name) 

283 except EntityNotExistsError: 

284 # Project must exist. 

285 return http_error( 

286 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist" 

287 ) 

288 

289 session_controller = SessionController() 

290 existing_kb = session_controller.kb_controller.get(knowledge_base_name, project.id) 

291 if existing_kb is None: 

292 # Knowledge Base must exist. 

293 return http_error( 

294 HTTPStatus.NOT_FOUND, 

295 "Knowledge Base not found", 

296 f"Knowledge Base with name {knowledge_base_name} does not exist", 

297 ) 

298 

299 session_controller.kb_controller.delete(knowledge_base_name, project_name) 

300 return "", HTTPStatus.NO_CONTENT 

301 

302 

303def _handle_chat_completion(knowledge_base_table: KnowledgeBaseTable, request): 

304 # Check for required parameters 

305 query = request.json.get("query") 

306 

307 llm_model = request.json.get("llm_model") 

308 if llm_model is None: 

309 logger.warning(f'Missing parameter "llm_model" in POST body, using default llm_model {DEFAULT_LLM_MODEL}') 

310 

311 prompt_template = request.json.get("prompt_template") 

312 if prompt_template is None: 

313 logger.warning( 

314 f'Missing parameter "prompt_template" in POST body, using default prompt template {DEFAULT_RAG_PROMPT_TEMPLATE}' 

315 ) 

316 

317 # Get retrieval config, if set 

318 retrieval_config = request.json.get("retrieval_config", {}) 

319 if not retrieval_config: 

320 logger.warning("No retrieval config provided, using default retrieval config") 

321 

322 # add llm model to retrieval config 

323 if llm_model is not None: 

324 retrieval_config["llm_model_name"] = llm_model 

325 

326 # add prompt template to retrieval config 

327 if prompt_template is not None: 

328 retrieval_config["rag_prompt_template"] = prompt_template 

329 

330 # add llm provider to retrieval config if set 

331 llm_provider = request.json.get("model_provider") 

332 if llm_provider is not None: 

333 retrieval_config["llm_provider"] = llm_provider 

334 

335 # build rag pipeline 

336 rag_pipeline = knowledge_base_table.build_rag_pipeline(retrieval_config) 

337 

338 # get response from rag pipeline 

339 rag_response = rag_pipeline(query) 

340 response = { 

341 "message": {"content": rag_response.get("answer"), "context": rag_response.get("context"), "role": "assistant"} 

342 } 

343 

344 return response 

345 

346 

347def _handle_context_completion(knowledge_base_table: KnowledgeBaseTable, request): 

348 # Used for semantic search. 

349 query = request.json.get("query") 

350 # Keyword search. 

351 keywords = request.json.get("keywords") 

352 # Metadata search. 

353 metadata = request.json.get("metadata") 

354 # Maximum amount of documents to return as context. 

355 limit = request.json.get("limit", DEFAULT_CONTEXT_DOCUMENT_LIMIT) 

356 

357 # Use default distance function & column names for ID, content, & metadata, to keep things simple. 

358 hybrid_search_df = knowledge_base_table.hybrid_search(query, keywords=keywords, metadata=metadata) 

359 

360 num_documents = len(hybrid_search_df.index) 

361 context_documents = [] 

362 for i in range(limit): 

363 if i >= num_documents: 

364 break 

365 row = hybrid_search_df.iloc[i] 

366 context_documents.append({"id": row["id"], "content": row["content"], "rank": row["rank"]}) 

367 

368 return {"documents": context_documents} 

369 

370 

371@ns_conf.route("/<project_name>/knowledge_bases/<knowledge_base_name>/completions") 

372@ns_conf.param("project_name", "Name of the project") 

373@ns_conf.param("knowledge_base_name", "Name of the knowledge_base") 

374class KnowledgeBaseCompletions(Resource): 

375 @ns_conf.doc("knowledge_base_completions") 

376 @api_endpoint_metrics("POST", "/knowledge_bases/knowledge_base/completions") 

377 def post(self, project_name, knowledge_base_name): 

378 """ 

379 Add support for LLM generation on the response from knowledge base. Default completion type is 'chat' unless specified. 

380 """ 

381 if request.json.get("query") is None: 

382 # "query" is used for semantic search for both completion types. 

383 logger.error('Missing parameter "query" in POST body') 

384 return http_error( 

385 HTTPStatus.BAD_REQUEST, "Missing parameter", 'Must provide "query" parameter in POST body' 

386 ) 

387 

388 project_controller = ProjectController() 

389 try: 

390 project = project_controller.get(name=project_name) 

391 except EntityNotExistsError: 

392 # Project must exist. 

393 logger.error("Project not found, please check the project name exists") 

394 return http_error( 

395 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist" 

396 ) 

397 

398 session = SessionController() 

399 # Check if knowledge base exists 

400 table = session.kb_controller.get_table(knowledge_base_name, project.id) 

401 if table is None: 

402 logger.error("Knowledge Base not found, please check the knowledge base name exists") 

403 return http_error( 

404 HTTPStatus.NOT_FOUND, 

405 "Knowledge Base not found", 

406 f"Knowledge Base with name {knowledge_base_name} does not exist", 

407 ) 

408 

409 completion_type = request.json.get("type", "chat") 

410 if completion_type == "context": 

411 return _handle_context_completion(table, request) 

412 if completion_type == "chat": 

413 return _handle_chat_completion(table, request) 

414 return http_error( 

415 HTTPStatus.BAD_REQUEST, 

416 "Invalid parameter", 

417 f'Completion type must be one of: "context", "chat". Received {completion_type}', 

418 )