Coverage for mindsdb / api / http / namespaces / agents.py: 61%

236 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2import json 

3from http import HTTPStatus 

4from typing import Dict, Iterable, List 

5 

6from flask import request, Response 

7from flask_restx import Resource 

8 

9from mindsdb.interfaces.agents.agents_controller import AgentsController 

10from mindsdb.interfaces.storage import db 

11from mindsdb.api.http.utils import http_error 

12from mindsdb.api.http.namespaces.configs.projects import ns_conf 

13from mindsdb.api.executor.controllers.session_controller import SessionController 

14from mindsdb.metrics.metrics import api_endpoint_metrics 

15from mindsdb.utilities.log import getLogger 

16from mindsdb.utilities.exception import EntityExistsError, EntityNotExistsError 

17 

18 

19logger = getLogger(__name__) 

20 

21 

22AGENT_QUICK_RESPONSE = "I understand your request. I'm working on a detailed response for you." 

23 

24 

25def create_agent(project_name, name, agent): 

26 if name is None: 

27 return http_error(HTTPStatus.BAD_REQUEST, "Missing field", 'Missing "name" field for agent') 

28 

29 model_name = agent.get("model_name") 

30 provider = agent.get("provider") 

31 skills = agent.get("skills", []) 

32 

33 params = agent.get("params", {}) 

34 if agent.get("data"): 

35 params["data"] = agent["data"] 

36 if agent.get("model"): 

37 params["model"] = agent["model"] 

38 if agent.get("prompt_template"): 

39 params["prompt_template"] = agent["prompt_template"] 

40 

41 agents_controller = AgentsController() 

42 

43 try: 

44 existing_agent = agents_controller.get_agent(name, project_name=project_name) 

45 except (ValueError, EntityNotExistsError): 

46 # Project must exist. 

47 return http_error(HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist") 

48 if existing_agent is not None: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true

49 return http_error( 

50 HTTPStatus.CONFLICT, 

51 "Agent already exists", 

52 f"Agent with name {name} already exists. Please choose a different one.", 

53 ) 

54 

55 try: 

56 created_agent = agents_controller.add_agent( 

57 name=name, project_name=project_name, model_name=model_name, skills=skills, provider=provider, params=params 

58 ) 

59 return created_agent.as_dict(), HTTPStatus.CREATED 

60 except (ValueError, EntityExistsError): 

61 # Model or skill doesn't exist. 

62 return http_error( 

63 HTTPStatus.NOT_FOUND, 

64 "Resource not found", 

65 f'The model "{model_name}" or skills "{skills}" do not exist. Please ensure that the names are correct and try again.', 

66 ) 

67 except NotImplementedError: 

68 # Free users trying to create agent. 

69 return http_error( 

70 HTTPStatus.UNAUTHORIZED, 

71 "Unavailable to free users", 

72 f'The model "{model_name}" or skills "{skills}" do not exist. Please ensure that the names are correct and try again.', 

73 ) 

74 

75 

76@ns_conf.route("/<project_name>/agents") 

77class AgentsResource(Resource): 

78 @ns_conf.doc("list_agents") 

79 @api_endpoint_metrics("GET", "/agents") 

80 def get(self, project_name): 

81 """List all agents""" 

82 session = SessionController() 

83 try: 

84 all_agents = session.agents_controller.get_agents(project_name) 

85 except EntityNotExistsError: 

86 # Project needs to exist. 

87 return http_error( 

88 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist" 

89 ) 

90 return [a.as_dict() for a in all_agents] 

91 

92 @ns_conf.doc("create_agent") 

93 @api_endpoint_metrics("POST", "/agents") 

94 def post(self, project_name): 

95 """Create a agent""" 

96 

97 # Check for required parameters. 

98 if "agent" not in request.json: 

99 return http_error( 

100 HTTPStatus.BAD_REQUEST, "Missing parameter", 'Must provide "agent" parameter in POST body' 

101 ) 

102 

103 agent = request.json["agent"] 

104 

105 name = agent.get("name") 

106 return create_agent(project_name, name, agent) 

107 

108 

109@ns_conf.route("/<project_name>/agents/<agent_name>") 

110@ns_conf.param("project_name", "Name of the project") 

111@ns_conf.param("agent_name", "Name of the agent") 

112class AgentResource(Resource): 

113 @ns_conf.doc("get_agent") 

114 @api_endpoint_metrics("GET", "/agents/agent") 

115 def get(self, project_name, agent_name): 

116 """Gets an agent by name""" 

117 session = SessionController() 

118 try: 

119 existing_agent = session.agents_controller.get_agent(agent_name, project_name=project_name) 

120 if existing_agent is None: 

121 return http_error( 

122 HTTPStatus.NOT_FOUND, "Agent not found", f"Agent with name {agent_name} does not exist" 

123 ) 

124 return existing_agent.as_dict() 

125 except (ValueError, EntityNotExistsError): 

126 # Project needs to exist. 

127 return http_error( 

128 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist" 

129 ) 

130 

131 @ns_conf.doc("update_agent") 

132 @api_endpoint_metrics("PUT", "/agents/agent") 

133 def put(self, project_name, agent_name): 

134 """Updates an agent by name, creating one if it doesn't exist""" 

135 

136 # Check for required parameters. 

137 if "agent" not in request.json: 

138 return http_error( 

139 HTTPStatus.BAD_REQUEST, "Missing parameter", 'Must provide "agent" parameter in POST body' 

140 ) 

141 agents_controller = AgentsController() 

142 

143 try: 

144 existing_agent_record = agents_controller.get_agent(agent_name, project_name=project_name) 

145 except (ValueError, EntityNotExistsError): 

146 # Project must exist. 

147 return http_error( 

148 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist" 

149 ) 

150 if existing_agent_record is None: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true

151 return http_error( 

152 HTTPStatus.BAD_REQUEST, 

153 "Creation is not allowed", 

154 "Creation of an agent using the PUT method is not allowed.", 

155 ) 

156 

157 agent = request.json["agent"] 

158 name = agent.get("name", None) 

159 

160 # Agent must not exist with new name. 

161 if name is not None and name != agent_name: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true

162 agent_with_new_name = agents_controller.get_agent(name, project_name=project_name) 

163 if agent_with_new_name is not None: 

164 return http_error( 

165 HTTPStatus.CONFLICT, 

166 "Agent already exists", 

167 f"Agent with name {name} already exists. Please choose a different one.", 

168 ) 

169 

170 if existing_agent_record is None: 170 ↛ 172line 170 didn't jump to line 172 because the condition on line 170 was never true

171 # Create 

172 return create_agent(project_name, name, agent) 

173 

174 # Update 

175 try: 

176 model_name = agent.get("model_name", None) 

177 skills_to_add = agent.get("skills_to_add", []) 

178 skills_to_remove = agent.get("skills_to_remove", []) 

179 skills_to_rewrite = agent.get("skills", []) 

180 provider = agent.get("provider") 

181 params = agent.get("params", {}) 

182 if agent.get("data"): 

183 params["data"] = agent["data"] 

184 if agent.get("model"): 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true

185 params["model"] = agent["model"] 

186 if agent.get("prompt_template"): 186 ↛ 187line 186 didn't jump to line 187 because the condition on line 186 was never true

187 params["prompt_template"] = agent["prompt_template"] 

188 

189 # Check if any of the skills to be added is of type 'retrieval' 

190 session = SessionController() 

191 skills_controller = session.skills_controller 

192 retrieval_skill_added = False 

193 if len(skills_to_add) > 0: 

194 skills_names = [x["name"] if isinstance(x, dict) else x for x in skills_to_add] 

195 retrieval_skill_added = any( 

196 skills_controller.get_skill(skill_name).type == "retrieval" 

197 for skill_name in skills_names 

198 if skills_controller.get_skill(skill_name) is not None 

199 ) 

200 elif len(skills_to_rewrite) > 0: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 retrieval_skill_added = any( 

202 skills_controller.get_skill(skill_meta["name"]).type == "retrieval" 

203 for skill_meta in skills_to_rewrite 

204 if skills_controller.get_skill(skill_meta["name"]) is not None 

205 ) 

206 

207 if retrieval_skill_added and "mode" not in params: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 params["mode"] = "retrieval" 

209 

210 updated_agent = agents_controller.update_agent( 

211 agent_name, 

212 project_name=project_name, 

213 name=name, 

214 model_name=model_name, 

215 skills_to_add=skills_to_add, 

216 skills_to_remove=skills_to_remove, 

217 skills_to_rewrite=skills_to_rewrite, 

218 provider=provider, 

219 params=params, 

220 ) 

221 

222 return updated_agent.as_dict() 

223 except EntityExistsError as e: 

224 return http_error(HTTPStatus.NOT_FOUND, "Resource should not exists", str(e)) 

225 except EntityNotExistsError as e: 

226 # Agent or skill doesn't exist. 

227 return http_error(HTTPStatus.NOT_FOUND, "Resource not found", str(e)) 

228 except ValueError as e: 

229 return http_error(HTTPStatus.BAD_REQUEST, "Wrong arguments", str(e)) 

230 

231 @ns_conf.doc("delete_agent") 

232 @api_endpoint_metrics("DELETE", "/agents/agent") 

233 def delete(self, project_name, agent_name): 

234 """Deletes a agent by name""" 

235 agents_controller = AgentsController() 

236 

237 try: 

238 existing_agent = agents_controller.get_agent(agent_name, project_name=project_name) 

239 if existing_agent is None: 

240 return http_error( 

241 HTTPStatus.NOT_FOUND, "Agent not found", f"Agent with name {agent_name} does not exist" 

242 ) 

243 except (ValueError, EntityNotExistsError): 

244 # Project needs to exist. 

245 return http_error( 

246 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist" 

247 ) 

248 

249 agents_controller.delete_agent(agent_name, project_name=project_name) 

250 return "", HTTPStatus.NO_CONTENT 

251 

252 

253def _completion_event_generator(agent_name: str, messages: List[Dict], project_name: str) -> Iterable[str]: 

254 logger.info(f"Starting completion event generator for agent {agent_name}") 

255 

256 def json_serialize(data): 

257 return f"data: {json.dumps(data)}\n\n" 

258 

259 try: 

260 # Populate API key by default if not present. 

261 session = SessionController() 

262 existing_agent = session.agents_controller.get_agent(agent_name, project_name=project_name) 

263 if not existing_agent.params: 

264 existing_agent.params = {} 

265 existing_agent.params["openai_api_key"] = existing_agent.params.get( 

266 "openai_api_key", os.getenv("OPENAI_API_KEY") 

267 ) 

268 # Have to commit/flush here so DB isn't locked while streaming. 

269 db.session.commit() 

270 

271 if "mode" not in existing_agent.params and any( 

272 rel.skill.type == "retrieval" for rel in existing_agent.skills_relationships 

273 ): 

274 existing_agent.params["mode"] = "retrieval" 

275 

276 completion_stream = session.agents_controller.get_completion( 

277 existing_agent, messages, project_name=project_name, tools=[], stream=True 

278 ) 

279 

280 for chunk in completion_stream: 

281 if isinstance(chunk, str) and chunk.startswith("data: "): 

282 # The chunk is already formatted correctly, yield it as is 

283 yield chunk 

284 elif isinstance(chunk, dict): 

285 if "error" in chunk: 

286 # Handle error chunks 

287 logger.error(f"Error in completion stream: {chunk['error']}") 

288 yield json_serialize({"error": chunk["error"]}) 

289 elif chunk.get("type") == "context": 

290 # Handle context message 

291 yield json_serialize({"type": "context", "content": chunk.get("content")}) 

292 elif chunk.get("type") == "sql": 

293 # Handle SQL query message 

294 yield json_serialize({"type": "sql", "content": chunk.get("content")}) 

295 else: 

296 # Chunk should already be formatted by agent stream. 

297 yield json_serialize(chunk) 

298 else: 

299 # For any other unexpected chunk types 

300 yield json_serialize({"output": str(chunk)}) 

301 

302 logger.debug(f"Streamed chunk: {str(chunk)[:100]}...") 

303 

304 logger.info("Completion stream finished") 

305 

306 except Exception: 

307 error_message = "Error in completion event generator" 

308 logger.exception(error_message) 

309 yield json_serialize({"error": error_message}) 

310 

311 finally: 

312 yield json_serialize({"type": "end"}) 

313 

314 

315@ns_conf.route("/<project_name>/agents/<agent_name>/completions/stream") 

316@ns_conf.param("project_name", "Name of the project") 

317@ns_conf.param("agent_name", "Name of the agent") 

318class AgentCompletionsStream(Resource): 

319 @ns_conf.doc("agent_completions_stream") 

320 @api_endpoint_metrics("POST", "/agents/agent/completions/stream") 

321 def post(self, project_name, agent_name): 

322 # Extract messages from request (HTTP format only) 

323 if "messages" not in request.json: 

324 return http_error( 

325 HTTPStatus.BAD_REQUEST, 

326 "Missing parameter", 

327 'Must provide "messages" parameter in POST body', 

328 ) 

329 

330 messages = request.json["messages"] 

331 

332 session = SessionController() 

333 try: 

334 existing_agent = session.agents_controller.get_agent(agent_name, project_name=project_name) 

335 if existing_agent is None: 

336 logger.warning(f"Agent {agent_name} not found in project {project_name}") 

337 return http_error( 

338 HTTPStatus.NOT_FOUND, "Agent not found", f"Agent with name {agent_name} does not exist" 

339 ) 

340 except ValueError as e: 

341 logger.warning(f"Project {project_name} not found: {e}") 

342 return http_error( 

343 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist" 

344 ) 

345 

346 try: 

347 gen = _completion_event_generator(agent_name, messages, project_name) 

348 logger.info(f"Starting streaming response for agent {agent_name}") 

349 return Response(gen, mimetype="text/event-stream") 

350 except Exception as e: 

351 logger.exception(f"Error during streaming for agent {agent_name}:") 

352 return http_error( 

353 HTTPStatus.INTERNAL_SERVER_ERROR, "Streaming error", f"An error occurred during streaming: {e}" 

354 ) 

355 

356 

357@ns_conf.route("/<project_name>/agents/<agent_name>/completions") 

358@ns_conf.param("project_name", "Name of the project") 

359@ns_conf.param("agent_name", "Name of the agent") 

360class AgentCompletions(Resource): 

361 @ns_conf.doc("agent_completions") 

362 @api_endpoint_metrics("POST", "/agents/agent/completions") 

363 def post(self, project_name, agent_name): 

364 """Queries an agent given a list of messages""" 

365 # Check for required parameters. 

366 if "messages" not in request.json: 

367 return http_error( 

368 HTTPStatus.BAD_REQUEST, "Missing parameter", 'Must provide "messages" parameter in POST body' 

369 ) 

370 agents_controller = AgentsController() 

371 

372 try: 

373 existing_agent = agents_controller.get_agent(agent_name, project_name=project_name) 

374 if existing_agent is None: 

375 return http_error( 

376 HTTPStatus.NOT_FOUND, "Agent not found", f"Agent with name {agent_name} does not exist" 

377 ) 

378 except (ValueError, EntityNotExistsError): 

379 # Project needs to exist. 

380 return http_error( 

381 HTTPStatus.NOT_FOUND, "Project not found", f"Project with name {project_name} does not exist" 

382 ) 

383 

384 # set mode to `retrieval` if agent has a skill of type `retrieval` and mode is not set 

385 if "mode" not in existing_agent.params and any( 385 ↛ 388line 385 didn't jump to line 388 because the condition on line 385 was never true

386 rel.skill.type == "retrieval" for rel in existing_agent.skills_relationships 

387 ): 

388 existing_agent.params["mode"] = "retrieval" 

389 

390 messages = request.json["messages"] 

391 

392 completion = agents_controller.get_completion( 

393 existing_agent, 

394 messages, 

395 project_name=project_name, 

396 # Don't need to include backoffice_db related tools into this endpoint. 

397 # Underlying handler (e.g. Langchain) will handle default tools like mdb_read, mdb_write, etc. 

398 tools=[], 

399 ) 

400 

401 output_col = agents_controller.assistant_column 

402 model_output = completion.iloc[-1][output_col] 

403 trace_id = completion.iloc[-1]["trace_id"] 

404 

405 response = {"message": {"content": model_output, "role": "assistant", "trace_id": trace_id}} 

406 

407 if existing_agent.params.get("return_context", False): 407 ↛ 408line 407 didn't jump to line 408 because the condition on line 407 was never true

408 context = [] 

409 if "context" in completion.columns: 

410 try: 

411 last_context = completion.iloc[-1]["context"] 

412 if last_context: 

413 context = json.loads(last_context) 

414 except (json.JSONDecodeError, IndexError): 

415 logger.warning("Error decoding context:", exc_info=True) 

416 pass # Keeping context as an empty list in case of error 

417 

418 response["message"]["context"] = context 

419 

420 return response