Coverage for mindsdb / api / http / namespaces / knowledge_bases.py: 43%
210 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from http import HTTPStatus
3from flask import request
4from flask_restx import Resource
5from mindsdb_sql_parser.ast import Identifier
7from mindsdb.api.http.namespaces.configs.projects import ns_conf
8from mindsdb.api.executor.controllers.session_controller import SessionController
9from mindsdb.api.executor.exceptions import ExecutorException
10from mindsdb.api.http.utils import http_error
12from mindsdb.api.mysql.mysql_proxy.classes.fake_mysql_proxy import FakeMysqlProxy
13from mindsdb.integrations.utilities.rag.splitters.file_splitter import FileSplitter, FileSplitterConfig
14from mindsdb.interfaces.file.file_controller import FileController
15from mindsdb.interfaces.knowledge_base.preprocessing.constants import (
16 DEFAULT_CONTEXT_DOCUMENT_LIMIT,
17 DEFAULT_CRAWL_DEPTH,
18 DEFAULT_WEB_FILTERS,
19 DEFAULT_WEB_CRAWL_LIMIT,
20)
21from mindsdb.interfaces.knowledge_base.preprocessing.document_loader import DocumentLoader
22from mindsdb.metrics.metrics import api_endpoint_metrics
23from mindsdb.interfaces.database.projects import ProjectController
24from mindsdb.interfaces.knowledge_base.controller import KnowledgeBaseTable
25from mindsdb.utilities import log
26from mindsdb.utilities.exception import EntityNotExistsError, EntityExistsError
27from mindsdb.integrations.utilities.rag.settings import DEFAULT_LLM_MODEL, DEFAULT_RAG_PROMPT_TEMPLATE
30logger = log.getLogger(__name__)
33@ns_conf.route("/<project_name>/knowledge_bases")
34class KnowledgeBasesResource(Resource):
35 @ns_conf.doc("list_knowledge_bases")
36 @api_endpoint_metrics("GET", "/knowledge_bases")
37 def get(self, project_name):
38 """List all knowledge bases"""
39 session = SessionController()
40 project_controller = ProjectController()
41 try:
42 _ = project_controller.get(name=project_name)
43 except EntityNotExistsError:
44 # Project must exist.
45 return http_error(
46 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist"
47 )
49 # KB Controller already returns dict.
50 return session.kb_controller.list(project_name)
52 @ns_conf.doc("create_knowledge_base")
53 @api_endpoint_metrics("POST", "/knowledge_bases")
54 def post(self, project_name):
55 """Create a knowledge base"""
57 # Check for required parameters.
58 if "knowledge_base" not in request.json: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true
59 return http_error(
60 HTTPStatus.BAD_REQUEST, "Missing parameter", 'Must provide "knowledge_base" parameter in POST body'
61 )
63 knowledge_base = request.json["knowledge_base"]
64 # Explicitly require embedding model & vector database.
65 required_fields = ["name"]
66 for field in required_fields:
67 if field not in knowledge_base: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true
68 return http_error(
69 HTTPStatus.BAD_REQUEST, "Missing parameter", f'Must provide "{field}" field in "knowledge_base"'
70 )
71 if "storage" in knowledge_base: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 if "database" not in knowledge_base["storage"] or "table" not in knowledge_base["storage"]:
73 return http_error(
74 HTTPStatus.BAD_REQUEST,
75 "Missing parameter",
76 'Must provide "database" and "table" field in "storage" param',
77 )
79 session = SessionController()
80 project_controller = ProjectController()
81 try:
82 project = project_controller.get(name=project_name)
83 except EntityNotExistsError:
84 # Project must exist.
85 return http_error(
86 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist"
87 )
89 kb_name = knowledge_base.get("name")
90 existing_kb = session.kb_controller.get(kb_name, project.id)
91 if existing_kb is not None: 91 ↛ 93line 91 didn't jump to line 93 because the condition on line 91 was never true
92 # Knowledge Base must not exist.
93 return http_error(
94 HTTPStatus.CONFLICT,
95 "Knowledge Base already exists",
96 f"Knowledge Base with name {kb_name} already exists",
97 )
99 # Legacy: Support for embedding model identifier.
100 # embedding_model_identifier = None
101 # if knowledge_base.get('model'):
102 # embedding_model_identifier = Identifier(parts=[knowledge_base['model']])
104 storage = knowledge_base.get("storage")
105 embedding_table_identifier = None
106 if storage is not None: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true
107 embedding_table_identifier = Identifier(parts=[storage["database"], storage["table"]])
109 params = knowledge_base.get("params", {})
111 optional_parameter_fields = [
112 "embedding_model",
113 "reranking_model",
114 "content_columns",
115 "metadata_columns",
116 "id_column",
117 ]
119 for field in optional_parameter_fields:
120 if field in knowledge_base: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true
121 params[field] = knowledge_base[field]
123 try:
124 new_kb = session.kb_controller.add(
125 kb_name,
126 project.name,
127 embedding_table_identifier,
128 params=params,
129 preprocessing_config=knowledge_base.get("preprocessing"),
130 )
131 except ValueError as e:
132 return http_error(HTTPStatus.BAD_REQUEST, "Invalid preprocessing configuration", str(e))
133 except EntityExistsError as e:
134 return http_error(HTTPStatus.BAD_REQUEST, "Knowledge base already exists", str(e))
136 return new_kb.as_dict(session.show_secrets), HTTPStatus.CREATED
139@ns_conf.route("/<project_name>/knowledge_bases/<knowledge_base_name>")
140@ns_conf.param("project_name", "Name of the project")
141@ns_conf.param("knowledge_base_name", "Name of the knowledge_base")
142class KnowledgeBaseResource(Resource):
143 @ns_conf.doc("get_knowledge_base")
144 @api_endpoint_metrics("GET", "/knowledge_bases/knowledge_base")
145 def get(self, project_name, knowledge_base_name):
146 """Gets a knowledge base by name"""
147 session = SessionController()
148 project_controller = ProjectController()
149 try:
150 project = project_controller.get(name=project_name)
151 except EntityNotExistsError:
152 # Project must exist.
153 return http_error(
154 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist"
155 )
157 existing_kb = session.kb_controller.get(knowledge_base_name, project.id)
158 if existing_kb is None:
159 return http_error(
160 HTTPStatus.NOT_FOUND,
161 "Knowledge Base not found",
162 f"Knowledge Base with name {knowledge_base_name} does not exist",
163 )
164 return existing_kb.as_dict(session.show_secrets), HTTPStatus.OK
166 @ns_conf.doc("update_knowledge_base")
167 @api_endpoint_metrics("PUT", "/knowledge_bases/knowledge_base")
168 def put(self, project_name: str, knowledge_base_name: str):
169 """Updates a knowledge base with optional preprocessing."""
171 # Check for required parameters
172 if "knowledge_base" not in request.json: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 return http_error(
174 HTTPStatus.BAD_REQUEST, "Missing parameter", 'Must provide "knowledge_base" parameter in PUT body'
175 )
177 session = SessionController()
178 project_controller = ProjectController()
180 try:
181 project = project_controller.get(name=project_name)
182 except EntityNotExistsError:
183 return http_error(
184 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist"
185 )
187 try:
188 kb_data = request.json["knowledge_base"]
190 # Retrieve the knowledge base table for updates
191 table = session.kb_controller.get_table(knowledge_base_name, project.id, params=kb_data.get("params"))
192 if table is None: 192 ↛ 193line 192 didn't jump to line 193 because the condition on line 192 was never true
193 return http_error(
194 HTTPStatus.NOT_FOUND,
195 "Knowledge Base not found",
196 f"Knowledge Base with name {knowledge_base_name} does not exist",
197 )
199 # Set up dependencies for DocumentLoader
200 file_controller = FileController()
201 file_splitter_config = FileSplitterConfig()
202 file_splitter = FileSplitter(file_splitter_config)
203 mysql_proxy = FakeMysqlProxy()
205 # Initialize DocumentLoader with required components
206 document_loader = DocumentLoader(
207 file_controller=file_controller,
208 file_splitter=file_splitter,
209 mysql_proxy=mysql_proxy,
210 )
212 # Configure table with dependencies
213 table.document_loader = document_loader
215 # Update preprocessing configuration if provided
216 if "preprocessing" in kb_data: 216 ↛ 217line 216 didn't jump to line 217 because the condition on line 216 was never true
217 table.configure_preprocessing(kb_data["preprocessing"])
219 # Process raw data rows if provided
220 if kb_data.get("rows"): 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true
221 table.insert_rows(kb_data["rows"])
223 # Process files if specified
224 if kb_data.get("files"): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 table.insert_files(kb_data["files"])
227 # Process web pages if URLs provided
228 if kb_data.get("urls"): 228 ↛ 229line 228 didn't jump to line 229 because the condition on line 228 was never true
229 table.insert_web_pages(
230 urls=kb_data["urls"],
231 limit=kb_data.get("limit") or DEFAULT_WEB_CRAWL_LIMIT,
232 crawl_depth=kb_data.get("crawl_depth", DEFAULT_CRAWL_DEPTH),
233 filters=kb_data.get("filters", DEFAULT_WEB_FILTERS),
234 )
236 # Process query if provided
237 if kb_data.get("query"): 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true
238 table.insert_query_result(kb_data["query"], project_name)
240 # update KB
241 update_kb_data = {}
242 if "params" in kb_data: 242 ↛ 252line 242 didn't jump to line 252 because the condition on line 242 was always true
243 allowed_keys = [
244 "id_column",
245 "metadata_columns",
246 "content_columns",
247 "preprocessing",
248 "reranking_model",
249 "embedding_model",
250 ]
251 update_kb_data = {k: v for k, v in kb_data["params"].items() if k in allowed_keys}
252 if update_kb_data or "preprocessing" in kb_data: 252 ↛ 274line 252 didn't jump to line 274 because the condition on line 252 was always true
253 session.kb_controller.update(
254 knowledge_base_name,
255 project.name,
256 params=update_kb_data,
257 preprocessing_config=kb_data.get("preprocessing"),
258 )
260 except ExecutorException as e:
261 logger.exception("Error during preprocessing and insertion:")
262 return http_error(
263 HTTPStatus.BAD_REQUEST,
264 "Invalid SELECT query",
265 f'Executing "query" failed. Needs to be a valid SELECT statement that returns data: {e}',
266 )
268 except Exception as e:
269 logger.exception("Error during preprocessing and insertion:")
270 return http_error(
271 HTTPStatus.BAD_REQUEST, "Preprocessing Error", f"Error during preprocessing and insertion: {e}"
272 )
274 return "", HTTPStatus.OK
276 @ns_conf.doc("delete_knowledge_base")
277 @api_endpoint_metrics("DELETE", "/knowledge_bases/knowledge_base")
278 def delete(self, project_name: str, knowledge_base_name: str):
279 """Deletes a knowledge base."""
280 project_controller = ProjectController()
281 try:
282 project = project_controller.get(name=project_name)
283 except EntityNotExistsError:
284 # Project must exist.
285 return http_error(
286 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist"
287 )
289 session_controller = SessionController()
290 existing_kb = session_controller.kb_controller.get(knowledge_base_name, project.id)
291 if existing_kb is None:
292 # Knowledge Base must exist.
293 return http_error(
294 HTTPStatus.NOT_FOUND,
295 "Knowledge Base not found",
296 f"Knowledge Base with name {knowledge_base_name} does not exist",
297 )
299 session_controller.kb_controller.delete(knowledge_base_name, project_name)
300 return "", HTTPStatus.NO_CONTENT
303def _handle_chat_completion(knowledge_base_table: KnowledgeBaseTable, request):
304 # Check for required parameters
305 query = request.json.get("query")
307 llm_model = request.json.get("llm_model")
308 if llm_model is None:
309 logger.warning(f'Missing parameter "llm_model" in POST body, using default llm_model {DEFAULT_LLM_MODEL}')
311 prompt_template = request.json.get("prompt_template")
312 if prompt_template is None:
313 logger.warning(
314 f'Missing parameter "prompt_template" in POST body, using default prompt template {DEFAULT_RAG_PROMPT_TEMPLATE}'
315 )
317 # Get retrieval config, if set
318 retrieval_config = request.json.get("retrieval_config", {})
319 if not retrieval_config:
320 logger.warning("No retrieval config provided, using default retrieval config")
322 # add llm model to retrieval config
323 if llm_model is not None:
324 retrieval_config["llm_model_name"] = llm_model
326 # add prompt template to retrieval config
327 if prompt_template is not None:
328 retrieval_config["rag_prompt_template"] = prompt_template
330 # add llm provider to retrieval config if set
331 llm_provider = request.json.get("model_provider")
332 if llm_provider is not None:
333 retrieval_config["llm_provider"] = llm_provider
335 # build rag pipeline
336 rag_pipeline = knowledge_base_table.build_rag_pipeline(retrieval_config)
338 # get response from rag pipeline
339 rag_response = rag_pipeline(query)
340 response = {
341 "message": {"content": rag_response.get("answer"), "context": rag_response.get("context"), "role": "assistant"}
342 }
344 return response
347def _handle_context_completion(knowledge_base_table: KnowledgeBaseTable, request):
348 # Used for semantic search.
349 query = request.json.get("query")
350 # Keyword search.
351 keywords = request.json.get("keywords")
352 # Metadata search.
353 metadata = request.json.get("metadata")
354 # Maximum amount of documents to return as context.
355 limit = request.json.get("limit", DEFAULT_CONTEXT_DOCUMENT_LIMIT)
357 # Use default distance function & column names for ID, content, & metadata, to keep things simple.
358 hybrid_search_df = knowledge_base_table.hybrid_search(query, keywords=keywords, metadata=metadata)
360 num_documents = len(hybrid_search_df.index)
361 context_documents = []
362 for i in range(limit):
363 if i >= num_documents:
364 break
365 row = hybrid_search_df.iloc[i]
366 context_documents.append({"id": row["id"], "content": row["content"], "rank": row["rank"]})
368 return {"documents": context_documents}
371@ns_conf.route("/<project_name>/knowledge_bases/<knowledge_base_name>/completions")
372@ns_conf.param("project_name", "Name of the project")
373@ns_conf.param("knowledge_base_name", "Name of the knowledge_base")
374class KnowledgeBaseCompletions(Resource):
375 @ns_conf.doc("knowledge_base_completions")
376 @api_endpoint_metrics("POST", "/knowledge_bases/knowledge_base/completions")
377 def post(self, project_name, knowledge_base_name):
378 """
379 Add support for LLM generation on the response from knowledge base. Default completion type is 'chat' unless specified.
380 """
381 if request.json.get("query") is None:
382 # "query" is used for semantic search for both completion types.
383 logger.error('Missing parameter "query" in POST body')
384 return http_error(
385 HTTPStatus.BAD_REQUEST, "Missing parameter", 'Must provide "query" parameter in POST body'
386 )
388 project_controller = ProjectController()
389 try:
390 project = project_controller.get(name=project_name)
391 except EntityNotExistsError:
392 # Project must exist.
393 logger.error("Project not found, please check the project name exists")
394 return http_error(
395 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist"
396 )
398 session = SessionController()
399 # Check if knowledge base exists
400 table = session.kb_controller.get_table(knowledge_base_name, project.id)
401 if table is None:
402 logger.error("Knowledge Base not found, please check the knowledge base name exists")
403 return http_error(
404 HTTPStatus.NOT_FOUND,
405 "Knowledge Base not found",
406 f"Knowledge Base with name {knowledge_base_name} does not exist",
407 )
409 completion_type = request.json.get("type", "chat")
410 if completion_type == "context":
411 return _handle_context_completion(table, request)
412 if completion_type == "chat":
413 return _handle_chat_completion(table, request)
414 return http_error(
415 HTTPStatus.BAD_REQUEST,
416 "Invalid parameter",
417 f'Completion type must be one of: "context", "chat". Received {completion_type}',
418 )