Coverage for mindsdb / api / http / namespaces / agents.py: 61%
236 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os
2import json
3from http import HTTPStatus
4from typing import Dict, Iterable, List
6from flask import request, Response
7from flask_restx import Resource
9from mindsdb.interfaces.agents.agents_controller import AgentsController
10from mindsdb.interfaces.storage import db
11from mindsdb.api.http.utils import http_error
12from mindsdb.api.http.namespaces.configs.projects import ns_conf
13from mindsdb.api.executor.controllers.session_controller import SessionController
14from mindsdb.metrics.metrics import api_endpoint_metrics
15from mindsdb.utilities.log import getLogger
16from mindsdb.utilities.exception import EntityExistsError, EntityNotExistsError
19logger = getLogger(__name__)
22AGENT_QUICK_RESPONSE = "I understand your request. I'm working on a detailed response for you."
25def create_agent(project_name, name, agent):
26 if name is None:
27 return http_error(HTTPStatus.BAD_REQUEST, "Missing field", 'Missing "name" field for agent')
29 model_name = agent.get("model_name")
30 provider = agent.get("provider")
31 skills = agent.get("skills", [])
33 params = agent.get("params", {})
34 if agent.get("data"):
35 params["data"] = agent["data"]
36 if agent.get("model"):
37 params["model"] = agent["model"]
38 if agent.get("prompt_template"):
39 params["prompt_template"] = agent["prompt_template"]
41 agents_controller = AgentsController()
43 try:
44 existing_agent = agents_controller.get_agent(name, project_name=project_name)
45 except (ValueError, EntityNotExistsError):
46 # Project must exist.
47 return http_error(HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist")
48 if existing_agent is not None: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true
49 return http_error(
50 HTTPStatus.CONFLICT,
51 "Agent already exists",
52 f"Agent with name {name} already exists. Please choose a different one.",
53 )
55 try:
56 created_agent = agents_controller.add_agent(
57 name=name, project_name=project_name, model_name=model_name, skills=skills, provider=provider, params=params
58 )
59 return created_agent.as_dict(), HTTPStatus.CREATED
60 except (ValueError, EntityExistsError):
61 # Model or skill doesn't exist.
62 return http_error(
63 HTTPStatus.NOT_FOUND,
64 "Resource not found",
65 f'The model "{model_name}" or skills "{skills}" do not exist. Please ensure that the names are correct and try again.',
66 )
67 except NotImplementedError:
68 # Free users trying to create agent.
69 return http_error(
70 HTTPStatus.UNAUTHORIZED,
71 "Unavailable to free users",
72 f'The model "{model_name}" or skills "{skills}" do not exist. Please ensure that the names are correct and try again.',
73 )
76@ns_conf.route("/<project_name>/agents")
77class AgentsResource(Resource):
78 @ns_conf.doc("list_agents")
79 @api_endpoint_metrics("GET", "/agents")
80 def get(self, project_name):
81 """List all agents"""
82 session = SessionController()
83 try:
84 all_agents = session.agents_controller.get_agents(project_name)
85 except EntityNotExistsError:
86 # Project needs to exist.
87 return http_error(
88 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist"
89 )
90 return [a.as_dict() for a in all_agents]
92 @ns_conf.doc("create_agent")
93 @api_endpoint_metrics("POST", "/agents")
94 def post(self, project_name):
95 """Create a agent"""
97 # Check for required parameters.
98 if "agent" not in request.json:
99 return http_error(
100 HTTPStatus.BAD_REQUEST, "Missing parameter", 'Must provide "agent" parameter in POST body'
101 )
103 agent = request.json["agent"]
105 name = agent.get("name")
106 return create_agent(project_name, name, agent)
109@ns_conf.route("/<project_name>/agents/<agent_name>")
110@ns_conf.param("project_name", "Name of the project")
111@ns_conf.param("agent_name", "Name of the agent")
112class AgentResource(Resource):
113 @ns_conf.doc("get_agent")
114 @api_endpoint_metrics("GET", "/agents/agent")
115 def get(self, project_name, agent_name):
116 """Gets an agent by name"""
117 session = SessionController()
118 try:
119 existing_agent = session.agents_controller.get_agent(agent_name, project_name=project_name)
120 if existing_agent is None:
121 return http_error(
122 HTTPStatus.NOT_FOUND, "Agent not found", f"Agent with name {agent_name} does not exist"
123 )
124 return existing_agent.as_dict()
125 except (ValueError, EntityNotExistsError):
126 # Project needs to exist.
127 return http_error(
128 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist"
129 )
131 @ns_conf.doc("update_agent")
132 @api_endpoint_metrics("PUT", "/agents/agent")
133 def put(self, project_name, agent_name):
134 """Updates an agent by name, creating one if it doesn't exist"""
136 # Check for required parameters.
137 if "agent" not in request.json:
138 return http_error(
139 HTTPStatus.BAD_REQUEST, "Missing parameter", 'Must provide "agent" parameter in POST body'
140 )
141 agents_controller = AgentsController()
143 try:
144 existing_agent_record = agents_controller.get_agent(agent_name, project_name=project_name)
145 except (ValueError, EntityNotExistsError):
146 # Project must exist.
147 return http_error(
148 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist"
149 )
150 if existing_agent_record is None: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true
151 return http_error(
152 HTTPStatus.BAD_REQUEST,
153 "Creation is not allowed",
154 "Creation of an agent using the PUT method is not allowed.",
155 )
157 agent = request.json["agent"]
158 name = agent.get("name", None)
160 # Agent must not exist with new name.
161 if name is not None and name != agent_name: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true
162 agent_with_new_name = agents_controller.get_agent(name, project_name=project_name)
163 if agent_with_new_name is not None:
164 return http_error(
165 HTTPStatus.CONFLICT,
166 "Agent already exists",
167 f"Agent with name {name} already exists. Please choose a different one.",
168 )
170 if existing_agent_record is None: 170 ↛ 172line 170 didn't jump to line 172 because the condition on line 170 was never true
171 # Create
172 return create_agent(project_name, name, agent)
174 # Update
175 try:
176 model_name = agent.get("model_name", None)
177 skills_to_add = agent.get("skills_to_add", [])
178 skills_to_remove = agent.get("skills_to_remove", [])
179 skills_to_rewrite = agent.get("skills", [])
180 provider = agent.get("provider")
181 params = agent.get("params", {})
182 if agent.get("data"):
183 params["data"] = agent["data"]
184 if agent.get("model"): 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true
185 params["model"] = agent["model"]
186 if agent.get("prompt_template"): 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true
187 params["prompt_template"] = agent["prompt_template"]
189 # Check if any of the skills to be added is of type 'retrieval'
190 session = SessionController()
191 skills_controller = session.skills_controller
192 retrieval_skill_added = False
193 if len(skills_to_add) > 0:
194 skills_names = [x["name"] if isinstance(x, dict) else x for x in skills_to_add]
195 retrieval_skill_added = any(
196 skills_controller.get_skill(skill_name).type == "retrieval"
197 for skill_name in skills_names
198 if skills_controller.get_skill(skill_name) is not None
199 )
200 elif len(skills_to_rewrite) > 0: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true
201 retrieval_skill_added = any(
202 skills_controller.get_skill(skill_meta["name"]).type == "retrieval"
203 for skill_meta in skills_to_rewrite
204 if skills_controller.get_skill(skill_meta["name"]) is not None
205 )
207 if retrieval_skill_added and "mode" not in params: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 params["mode"] = "retrieval"
210 updated_agent = agents_controller.update_agent(
211 agent_name,
212 project_name=project_name,
213 name=name,
214 model_name=model_name,
215 skills_to_add=skills_to_add,
216 skills_to_remove=skills_to_remove,
217 skills_to_rewrite=skills_to_rewrite,
218 provider=provider,
219 params=params,
220 )
222 return updated_agent.as_dict()
223 except EntityExistsError as e:
224 return http_error(HTTPStatus.NOT_FOUND, "Resource should not exists", str(e))
225 except EntityNotExistsError as e:
226 # Agent or skill doesn't exist.
227 return http_error(HTTPStatus.NOT_FOUND, "Resource not found", str(e))
228 except ValueError as e:
229 return http_error(HTTPStatus.BAD_REQUEST, "Wrong arguments", str(e))
231 @ns_conf.doc("delete_agent")
232 @api_endpoint_metrics("DELETE", "/agents/agent")
233 def delete(self, project_name, agent_name):
234 """Deletes a agent by name"""
235 agents_controller = AgentsController()
237 try:
238 existing_agent = agents_controller.get_agent(agent_name, project_name=project_name)
239 if existing_agent is None:
240 return http_error(
241 HTTPStatus.NOT_FOUND, "Agent not found", f"Agent with name {agent_name} does not exist"
242 )
243 except (ValueError, EntityNotExistsError):
244 # Project needs to exist.
245 return http_error(
246 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist"
247 )
249 agents_controller.delete_agent(agent_name, project_name=project_name)
250 return "", HTTPStatus.NO_CONTENT
253def _completion_event_generator(agent_name: str, messages: List[Dict], project_name: str) -> Iterable[str]:
254 logger.info(f"Starting completion event generator for agent {agent_name}")
256 def json_serialize(data):
257 return f"data: {json.dumps(data)}\n\n"
259 try:
260 # Populate API key by default if not present.
261 session = SessionController()
262 existing_agent = session.agents_controller.get_agent(agent_name, project_name=project_name)
263 if not existing_agent.params:
264 existing_agent.params = {}
265 existing_agent.params["openai_api_key"] = existing_agent.params.get(
266 "openai_api_key", os.getenv("OPENAI_API_KEY")
267 )
268 # Have to commit/flush here so DB isn't locked while streaming.
269 db.session.commit()
271 if "mode" not in existing_agent.params and any(
272 rel.skill.type == "retrieval" for rel in existing_agent.skills_relationships
273 ):
274 existing_agent.params["mode"] = "retrieval"
276 completion_stream = session.agents_controller.get_completion(
277 existing_agent, messages, project_name=project_name, tools=[], stream=True
278 )
280 for chunk in completion_stream:
281 if isinstance(chunk, str) and chunk.startswith("data: "):
282 # The chunk is already formatted correctly, yield it as is
283 yield chunk
284 elif isinstance(chunk, dict):
285 if "error" in chunk:
286 # Handle error chunks
287 logger.error(f"Error in completion stream: {chunk['error']}")
288 yield json_serialize({"error": chunk["error"]})
289 elif chunk.get("type") == "context":
290 # Handle context message
291 yield json_serialize({"type": "context", "content": chunk.get("content")})
292 elif chunk.get("type") == "sql":
293 # Handle SQL query message
294 yield json_serialize({"type": "sql", "content": chunk.get("content")})
295 else:
296 # Chunk should already be formatted by agent stream.
297 yield json_serialize(chunk)
298 else:
299 # For any other unexpected chunk types
300 yield json_serialize({"output": str(chunk)})
302 logger.debug(f"Streamed chunk: {str(chunk)[:100]}...")
304 logger.info("Completion stream finished")
306 except Exception:
307 error_message = "Error in completion event generator"
308 logger.exception(error_message)
309 yield json_serialize({"error": error_message})
311 finally:
312 yield json_serialize({"type": "end"})
315@ns_conf.route("/<project_name>/agents/<agent_name>/completions/stream")
316@ns_conf.param("project_name", "Name of the project")
317@ns_conf.param("agent_name", "Name of the agent")
318class AgentCompletionsStream(Resource):
319 @ns_conf.doc("agent_completions_stream")
320 @api_endpoint_metrics("POST", "/agents/agent/completions/stream")
321 def post(self, project_name, agent_name):
322 # Extract messages from request (HTTP format only)
323 if "messages" not in request.json:
324 return http_error(
325 HTTPStatus.BAD_REQUEST,
326 "Missing parameter",
327 'Must provide "messages" parameter in POST body',
328 )
330 messages = request.json["messages"]
332 session = SessionController()
333 try:
334 existing_agent = session.agents_controller.get_agent(agent_name, project_name=project_name)
335 if existing_agent is None:
336 logger.warning(f"Agent {agent_name} not found in project {project_name}")
337 return http_error(
338 HTTPStatus.NOT_FOUND, "Agent not found", f"Agent with name {agent_name} does not exist"
339 )
340 except ValueError as e:
341 logger.warning(f"Project {project_name} not found: {e}")
342 return http_error(
343 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist"
344 )
346 try:
347 gen = _completion_event_generator(agent_name, messages, project_name)
348 logger.info(f"Starting streaming response for agent {agent_name}")
349 return Response(gen, mimetype="text/event-stream")
350 except Exception as e:
351 logger.exception(f"Error during streaming for agent {agent_name}:")
352 return http_error(
353 HTTPStatus.INTERNAL_SERVER_ERROR, "Streaming error", f"An error occurred during streaming: {e}"
354 )
357@ns_conf.route("/<project_name>/agents/<agent_name>/completions")
358@ns_conf.param("project_name", "Name of the project")
359@ns_conf.param("agent_name", "Name of the agent")
360class AgentCompletions(Resource):
361 @ns_conf.doc("agent_completions")
362 @api_endpoint_metrics("POST", "/agents/agent/completions")
363 def post(self, project_name, agent_name):
364 """Queries an agent given a list of messages"""
365 # Check for required parameters.
366 if "messages" not in request.json:
367 return http_error(
368 HTTPStatus.BAD_REQUEST, "Missing parameter", 'Must provide "messages" parameter in POST body'
369 )
370 agents_controller = AgentsController()
372 try:
373 existing_agent = agents_controller.get_agent(agent_name, project_name=project_name)
374 if existing_agent is None:
375 return http_error(
376 HTTPStatus.NOT_FOUND, "Agent not found", f"Agent with name {agent_name} does not exist"
377 )
378 except (ValueError, EntityNotExistsError):
379 # Project needs to exist.
380 return http_error(
381 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist"
382 )
384 # set mode to `retrieval` if agent has a skill of type `retrieval` and mode is not set
385 if "mode" not in existing_agent.params and any( 385 ↛ 388line 385 didn't jump to line 388 because the condition on line 385 was never true
386 rel.skill.type == "retrieval" for rel in existing_agent.skills_relationships
387 ):
388 existing_agent.params["mode"] = "retrieval"
390 messages = request.json["messages"]
392 completion = agents_controller.get_completion(
393 existing_agent,
394 messages,
395 project_name=project_name,
396 # Don't need to include backoffice_db related tools into this endpoint.
397 # Underlying handler (e.g. Langchain) will handle default tools like mdb_read, mdb_write, etc.
398 tools=[],
399 )
401 output_col = agents_controller.assistant_column
402 model_output = completion.iloc[-1][output_col]
403 trace_id = completion.iloc[-1]["trace_id"]
405 response = {"message": {"content": model_output, "role": "assistant", "trace_id": trace_id}}
407 if existing_agent.params.get("return_context", False): 407 ↛ 408line 407 didn't jump to line 408 because the condition on line 407 was never true
408 context = []
409 if "context" in completion.columns:
410 try:
411 last_context = completion.iloc[-1]["context"]
412 if last_context:
413 context = json.loads(last_context)
414 except (json.JSONDecodeError, IndexError):
415 logger.warning("Error decoding context:", exc_info=True)
416 pass # Keeping context as an empty list in case of error
418 response["message"]["context"] = context
420 return response