Coverage for mindsdb / api / a2a / agent.py: 0%
108 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import json
2from typing import Any, AsyncIterable, Dict, List
3import requests
4import httpx
5from mindsdb.api.a2a.utils import to_serializable, convert_a2a_message_to_qa_format
6from mindsdb.api.a2a.constants import DEFAULT_STREAM_TIMEOUT
7from mindsdb.api.a2a.common.types import A2AClientError, A2AClientHTTPError
8from mindsdb.utilities import log
9from mindsdb.utilities.config import config
11logger = log.getLogger(__name__)
14class MindsDBAgent:
15 """An agent that communicates with MindsDB over HTTP following the A2A protocol."""
17 SUPPORTED_CONTENT_TYPES = ["text", "text/plain", "application/json"]
19 def __init__(
20 self,
21 agent_name="my_agent",
22 project_name="mindsdb",
23 user_info: Dict[str, Any] = None,
24 ):
25 self.agent_name = agent_name
26 self.project_name = project_name
27 port = config.get("api", {}).get("http", {}).get("port", 47334)
28 host = config.get("api", {}).get("http", {}).get("host", "127.0.0.1")
30 # Use 127.0.0.1 instead of localhost for better compatibility
31 if host in ("0.0.0.0", ""):
32 url = f"http://127.0.0.1:{port}/"
33 else:
34 url = f"http://{host}:{port}/"
36 self.base_url = url
37 self.agent_url = f"{self.base_url}/api/projects/{project_name}/agents/{agent_name}"
38 self.sql_url = f"{self.base_url}/api/sql/query"
39 self.headers = {k: v for k, v in user_info.items() if v is not None} or {}
40 logger.info(f"Initialized MindsDB agent connector to {self.base_url}")
42 def invoke(self, query, session_id) -> Dict[str, Any]:
43 """Send a query to the MindsDB agent using SQL API."""
44 try:
45 escaped_query = query.replace("'", "''")
46 sql_query = f"SELECT * FROM {self.project_name}.{self.agent_name} WHERE question = '{escaped_query}'"
47 logger.debug(f"Sending SQL query to MindsDB: {sql_query[:100]}...")
48 response = requests.post(self.sql_url, json={"query": sql_query}, headers=self.headers)
49 response.raise_for_status()
50 data = response.json()
51 logger.debug(f"Received response from MindsDB: {json.dumps(data)[:200]}...")
52 if "data" in data and len(data["data"]) > 0:
53 result_row = data["data"][0]
54 for column in ["response", "result", "answer", "completion", "output"]:
55 if column in result_row:
56 content = result_row[column]
57 logger.info(f"Found result in column '{column}': {content[:100]}...")
58 return {
59 "content": content,
60 "parts": [{"type": "text", "text": content}],
61 }
62 logger.info("No specific result column found, returning full row")
63 content = json.dumps(result_row, indent=2)
64 parts = [{"type": "text", "text": content}]
65 if isinstance(result_row, dict):
66 parts.append(
67 {
68 "type": "data",
69 "data": result_row,
70 "metadata": {"subtype": "json"},
71 }
72 )
73 return {
74 "content": content,
75 "parts": parts,
76 }
77 else:
78 error_msg = "Error: No data returned from MindsDB"
79 logger.error(error_msg)
80 return {
81 "content": error_msg,
82 "parts": [{"type": "text", "text": error_msg}],
83 }
84 except requests.exceptions.RequestException as e:
85 logger.exception("Error connecting to MindsDB:")
86 return {
87 "content": f"Error connecting to MindsDB: {e}",
88 "parts": [{"type": "text", "text": error_msg}],
89 }
90 except Exception as e:
91 logger.exception("Error: ")
92 return {
93 "content": f"Error: {e}",
94 "parts": [{"type": "text", "text": error_msg}],
95 }
97 async def streaming_invoke(self, messages, timeout=DEFAULT_STREAM_TIMEOUT):
98 url = f"{self.base_url}/api/projects/{self.project_name}/agents/{self.agent_name}/completions/stream"
99 logger.debug(f"Sending streaming request to MindsDB agent: {self.agent_name}")
100 try:
101 async with httpx.AsyncClient(timeout=timeout, headers=self.headers) as client:
102 async with client.stream("POST", url, json={"messages": to_serializable(messages)}) as response:
103 response.raise_for_status()
104 async for line in response.aiter_lines():
105 if not line.strip():
106 continue
107 # Only process actual SSE data lines
108 if line.startswith("data:"):
109 payload = line[len("data:") :].strip()
110 try:
111 yield json.loads(payload)
112 except Exception as e:
113 logger.exception(f"Failed to parse SSE JSON payload: {e}; line: {payload}")
114 # Ignore comments or control lines
115 # Signal the end of the stream
116 yield {"is_task_complete": True}
117 except httpx.ReadTimeout:
118 error_msg = f"Request timed out after {timeout} seconds while streaming from agent '{self.agent_name}'"
119 logger.error(error_msg)
120 raise TimeoutError(error_msg)
121 except httpx.ConnectTimeout:
122 error_msg = f"Connection timeout while connecting to agent '{self.agent_name}' at {url}"
123 logger.error(error_msg)
124 raise ConnectionError(error_msg)
125 except httpx.ConnectError as e:
126 error_msg = f"Failed to connect to agent '{self.agent_name}' at {url}: {str(e)}"
127 logger.error(error_msg)
128 raise ConnectionError(error_msg)
129 except httpx.HTTPStatusError as e:
130 error_msg = f"HTTP error {e.response.status_code} from agent '{self.agent_name}': {str(e)}"
131 logger.error(error_msg)
132 raise A2AClientHTTPError(status_code=e.response.status_code, message=error_msg)
133 except httpx.RequestError as e:
134 error_msg = f"Request error while streaming from agent '{self.agent_name}': {str(e)}"
135 logger.error(error_msg)
136 raise A2AClientError(error_msg)
138 async def stream(
139 self,
140 query: str,
141 session_id: str,
142 history: List[dict] | None = None,
143 timeout: int = DEFAULT_STREAM_TIMEOUT,
144 ) -> AsyncIterable[Dict[str, Any]]:
145 """Stream responses from the MindsDB agent (uses streaming API endpoint)."""
146 try:
147 # Create A2A message structure with history and current query
148 a2a_message = {"role": "user", "parts": [{"text": query}]}
149 if history:
150 a2a_message["history"] = history
151 # Convert to Q&A format using centralized utility
152 formatted_messages = convert_a2a_message_to_qa_format(a2a_message)
153 logger.debug(f"Formatted messages for agent: {formatted_messages}")
154 streaming_response = self.streaming_invoke(formatted_messages, timeout=timeout)
155 async for chunk in streaming_response:
156 content_value = chunk.get("text") or chunk.get("output") or json.dumps(chunk)
157 wrapped_chunk = {"is_task_complete": False, "content": content_value, "metadata": {}}
158 yield wrapped_chunk
159 except Exception as e:
160 logger.exception(f"Error in streaming: {e}")
161 yield {
162 "is_task_complete": True,
163 "parts": [
164 {
165 "type": "text",
166 "text": f"Error: {e}",
167 }
168 ],
169 "metadata": {
170 "type": "reasoning",
171 "subtype": "error",
172 },
173 }