Coverage for mindsdb / api / a2a / agent.py: 0%

108 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import json 

2from typing import Any, AsyncIterable, Dict, List 

3import requests 

4import httpx 

5from mindsdb.api.a2a.utils import to_serializable, convert_a2a_message_to_qa_format 

6from mindsdb.api.a2a.constants import DEFAULT_STREAM_TIMEOUT 

7from mindsdb.api.a2a.common.types import A2AClientError, A2AClientHTTPError 

8from mindsdb.utilities import log 

9from mindsdb.utilities.config import config 

10 

11logger = log.getLogger(__name__) 

12 

13 

14class MindsDBAgent: 

15 """An agent that communicates with MindsDB over HTTP following the A2A protocol.""" 

16 

17 SUPPORTED_CONTENT_TYPES = ["text", "text/plain", "application/json"] 

18 

19 def __init__( 

20 self, 

21 agent_name="my_agent", 

22 project_name="mindsdb", 

23 user_info: Dict[str, Any] = None, 

24 ): 

25 self.agent_name = agent_name 

26 self.project_name = project_name 

27 port = config.get("api", {}).get("http", {}).get("port", 47334) 

28 host = config.get("api", {}).get("http", {}).get("host", "127.0.0.1") 

29 

30 # Use 127.0.0.1 instead of localhost for better compatibility 

31 if host in ("0.0.0.0", ""): 

32 url = f"http://127.0.0.1:{port}/" 

33 else: 

34 url = f"http://{host}:{port}/" 

35 

36 self.base_url = url 

37 self.agent_url = f"{self.base_url}/api/projects/{project_name}/agents/{agent_name}" 

38 self.sql_url = f"{self.base_url}/api/sql/query" 

39 self.headers = {k: v for k, v in user_info.items() if v is not None} or {} 

40 logger.info(f"Initialized MindsDB agent connector to {self.base_url}") 

41 

42 def invoke(self, query, session_id) -> Dict[str, Any]: 

43 """Send a query to the MindsDB agent using SQL API.""" 

44 try: 

45 escaped_query = query.replace("'", "''") 

46 sql_query = f"SELECT * FROM {self.project_name}.{self.agent_name} WHERE question = '{escaped_query}'" 

47 logger.debug(f"Sending SQL query to MindsDB: {sql_query[:100]}...") 

48 response = requests.post(self.sql_url, json={"query": sql_query}, headers=self.headers) 

49 response.raise_for_status() 

50 data = response.json() 

51 logger.debug(f"Received response from MindsDB: {json.dumps(data)[:200]}...") 

52 if "data" in data and len(data["data"]) > 0: 

53 result_row = data["data"][0] 

54 for column in ["response", "result", "answer", "completion", "output"]: 

55 if column in result_row: 

56 content = result_row[column] 

57 logger.info(f"Found result in column '{column}': {content[:100]}...") 

58 return { 

59 "content": content, 

60 "parts": [{"type": "text", "text": content}], 

61 } 

62 logger.info("No specific result column found, returning full row") 

63 content = json.dumps(result_row, indent=2) 

64 parts = [{"type": "text", "text": content}] 

65 if isinstance(result_row, dict): 

66 parts.append( 

67 { 

68 "type": "data", 

69 "data": result_row, 

70 "metadata": {"subtype": "json"}, 

71 } 

72 ) 

73 return { 

74 "content": content, 

75 "parts": parts, 

76 } 

77 else: 

78 error_msg = "Error: No data returned from MindsDB" 

79 logger.error(error_msg) 

80 return { 

81 "content": error_msg, 

82 "parts": [{"type": "text", "text": error_msg}], 

83 } 

84 except requests.exceptions.RequestException as e: 

85 logger.exception("Error connecting to MindsDB:") 

86 return { 

87 "content": f"Error connecting to MindsDB: {e}", 

88 "parts": [{"type": "text", "text": error_msg}], 

89 } 

90 except Exception as e: 

91 logger.exception("Error: ") 

92 return { 

93 "content": f"Error: {e}", 

94 "parts": [{"type": "text", "text": error_msg}], 

95 } 

96 

97 async def streaming_invoke(self, messages, timeout=DEFAULT_STREAM_TIMEOUT): 

98 url = f"{self.base_url}/api/projects/{self.project_name}/agents/{self.agent_name}/completions/stream" 

99 logger.debug(f"Sending streaming request to MindsDB agent: {self.agent_name}") 

100 try: 

101 async with httpx.AsyncClient(timeout=timeout, headers=self.headers) as client: 

102 async with client.stream("POST", url, json={"messages": to_serializable(messages)}) as response: 

103 response.raise_for_status() 

104 async for line in response.aiter_lines(): 

105 if not line.strip(): 

106 continue 

107 # Only process actual SSE data lines 

108 if line.startswith("data:"): 

109 payload = line[len("data:") :].strip() 

110 try: 

111 yield json.loads(payload) 

112 except Exception as e: 

113 logger.exception(f"Failed to parse SSE JSON payload: {e}; line: {payload}") 

114 # Ignore comments or control lines 

115 # Signal the end of the stream 

116 yield {"is_task_complete": True} 

117 except httpx.ReadTimeout: 

118 error_msg = f"Request timed out after {timeout} seconds while streaming from agent '{self.agent_name}'" 

119 logger.error(error_msg) 

120 raise TimeoutError(error_msg) 

121 except httpx.ConnectTimeout: 

122 error_msg = f"Connection timeout while connecting to agent '{self.agent_name}' at {url}" 

123 logger.error(error_msg) 

124 raise ConnectionError(error_msg) 

125 except httpx.ConnectError as e: 

126 error_msg = f"Failed to connect to agent '{self.agent_name}' at {url}: {str(e)}" 

127 logger.error(error_msg) 

128 raise ConnectionError(error_msg) 

129 except httpx.HTTPStatusError as e: 

130 error_msg = f"HTTP error {e.response.status_code} from agent '{self.agent_name}': {str(e)}" 

131 logger.error(error_msg) 

132 raise A2AClientHTTPError(status_code=e.response.status_code, message=error_msg) 

133 except httpx.RequestError as e: 

134 error_msg = f"Request error while streaming from agent '{self.agent_name}': {str(e)}" 

135 logger.error(error_msg) 

136 raise A2AClientError(error_msg) 

137 

138 async def stream( 

139 self, 

140 query: str, 

141 session_id: str, 

142 history: List[dict] | None = None, 

143 timeout: int = DEFAULT_STREAM_TIMEOUT, 

144 ) -> AsyncIterable[Dict[str, Any]]: 

145 """Stream responses from the MindsDB agent (uses streaming API endpoint).""" 

146 try: 

147 # Create A2A message structure with history and current query 

148 a2a_message = {"role": "user", "parts": [{"text": query}]} 

149 if history: 

150 a2a_message["history"] = history 

151 # Convert to Q&A format using centralized utility 

152 formatted_messages = convert_a2a_message_to_qa_format(a2a_message) 

153 logger.debug(f"Formatted messages for agent: {formatted_messages}") 

154 streaming_response = self.streaming_invoke(formatted_messages, timeout=timeout) 

155 async for chunk in streaming_response: 

156 content_value = chunk.get("text") or chunk.get("output") or json.dumps(chunk) 

157 wrapped_chunk = {"is_task_complete": False, "content": content_value, "metadata": {}} 

158 yield wrapped_chunk 

159 except Exception as e: 

160 logger.exception(f"Error in streaming: {e}") 

161 yield { 

162 "is_task_complete": True, 

163 "parts": [ 

164 { 

165 "type": "text", 

166 "text": f"Error: {e}", 

167 } 

168 ], 

169 "metadata": { 

170 "type": "reasoning", 

171 "subtype": "error", 

172 }, 

173 }