Coverage for mindsdb / interfaces / agents / run_mcp_agent.py: 0%

132 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import sys 

2import argparse 

3import asyncio 

4from typing import List, Dict 

5from contextlib import AsyncExitStack 

6 

7from mcp import ClientSession, StdioServerParameters 

8from mcp.client.stdio import stdio_client 

9 

10from mindsdb.utilities import log 

11from mindsdb.interfaces.agents.mcp_client_agent import create_mcp_agent 

12 

13logger = log.getLogger(__name__) 

14 

15 

16async def run_conversation(agent_wrapper, messages: List[Dict[str, str]], stream: bool = False): 

17 """Run a conversation with the agent and print responses""" 

18 try: 

19 if stream: 

20 logger.info("Streaming response:") 

21 async for chunk in agent_wrapper.acompletion_stream(messages): 

22 content = chunk["choices"][0]["delta"].get("content", "") 

23 if content: 

24 # We still need to print content for streaming display 

25 # but we'll log it as debug as well 

26 logger.debug(f"Stream content: {content}") 

27 sys.stdout.write(content) 

28 sys.stdout.flush() 

29 logger.debug("End of stream") 

30 sys.stdout.write("\n\n") 

31 sys.stdout.flush() 

32 else: 

33 logger.info("Getting response...") 

34 response = await agent_wrapper.acompletion(messages) 

35 content = response["choices"][0]["message"]["content"] 

36 logger.info(f"Response: {content}") 

37 # We still need to display the response to the user 

38 sys.stdout.write(f"{content}\n") 

39 sys.stdout.flush() 

40 except Exception: 

41 logger.exception("Error during agent conversation:") 

42 

43 

44async def execute_direct_query(query): 

45 """Execute a direct SQL query using MCP""" 

46 logger.info(f"Executing direct SQL query: {query}") 

47 

48 # Set up MCP client to connect to the running server 

49 async with AsyncExitStack() as stack: 

50 # Connect to MCP server 

51 server_params = StdioServerParameters(command="python", args=["-m", "mindsdb", "--api=mcp"], env=None) 

52 

53 try: 

54 stdio_transport = await stack.enter_async_context(stdio_client(server_params)) 

55 stdio, write = stdio_transport 

56 session = await stack.enter_async_context(ClientSession(stdio, write)) 

57 

58 await session.initialize() 

59 

60 # List available tools 

61 tools_response = await session.list_tools() 

62 tool_names = [tool.name for tool in tools_response.tools] 

63 logger.info(f"Available tools: {tool_names}") 

64 

65 # Find query tool 

66 query_tool = None 

67 for tool in tools_response.tools: 

68 if tool.name == "query": 

69 query_tool = tool 

70 break 

71 

72 if not query_tool: 

73 logger.error("No 'query' tool found in MCP server") 

74 return 

75 

76 # Execute query 

77 result = await session.call_tool("query", {"query": query}) 

78 logger.info(f"Query result: {result.content}") 

79 except Exception: 

80 logger.exception("Error executing query:") 

81 logger.info("Make sure the MindsDB server is running with HTTP enabled: python -m mindsdb --api=http") 

82 

83 

84async def main(): 

85 parser = argparse.ArgumentParser(description="Run an agent as an MCP client") 

86 parser.add_argument("--agent", type=str, help="Name of the agent to use") 

87 parser.add_argument("--project", type=str, default="mindsdb", help="Project containing the agent") 

88 parser.add_argument("--host", type=str, default="127.0.0.1", help="MCP server host") 

89 parser.add_argument("--port", type=int, default=47337, help="MCP server port") 

90 parser.add_argument("--query", type=str, help="Query to send to the agent") 

91 parser.add_argument("--stream", action="store_true", help="Stream the response") 

92 parser.add_argument("--execute-direct", type=str, help="Execute a direct SQL query via MCP (for testing)") 

93 

94 args = parser.parse_args() 

95 

96 try: 

97 # Initialize database connection 

98 from mindsdb.interfaces.storage import db 

99 

100 db.init() 

101 

102 # Direct SQL execution mode (for testing MCP connection) 

103 if args.execute_direct: 

104 await execute_direct_query(args.execute_direct) 

105 return 0 

106 

107 # Make sure agent name is provided 

108 if not args.agent: 

109 parser.error("the --agent argument is required unless --execute-direct is used") 

110 

111 # Create the agent 

112 logger.info(f"Creating MCP client agent for '{args.agent}' in project '{args.project}'") 

113 logger.info(f"Connecting to MCP server at {args.host}:{args.port}") 

114 logger.info("Make sure MindsDB server is running with MCP enabled: python -m mindsdb --api=mysql,mcp,http") 

115 

116 agent_wrapper = create_mcp_agent( 

117 agent_name=args.agent, project_name=args.project, mcp_host=args.host, mcp_port=args.port 

118 ) 

119 

120 # Run an example query if provided 

121 if args.query: 

122 messages = [{"role": "user", "content": args.query}] 

123 await run_conversation(agent_wrapper, messages, args.stream) 

124 else: 

125 # Interactive mode 

126 logger.info("Entering interactive mode. Type 'exit' to quit.") 

127 logger.info("Available commands: exit/quit, clear, sql:") 

128 

129 # We still need to show these instructions to the user 

130 sys.stdout.write("\nEntering interactive mode. Type 'exit' to quit.\n") 

131 sys.stdout.write("\nAvailable commands:\n") 

132 sys.stdout.write(" exit, quit - Exit the program\n") 

133 sys.stdout.write(" clear - Clear conversation history\n") 

134 sys.stdout.write(" sql: <query> - Execute a direct SQL query via MCP\n") 

135 sys.stdout.flush() 

136 

137 messages = [] 

138 

139 while True: 

140 # We need to keep input for user interaction 

141 user_input = input("\nYou: ") 

142 

143 # Check for special commands 

144 if user_input.lower() in ["exit", "quit"]: 

145 logger.info("Exiting interactive mode") 

146 break 

147 elif user_input.lower() == "clear": 

148 messages = [] 

149 logger.info("Conversation history cleared") 

150 sys.stdout.write("Conversation history cleared\n") 

151 sys.stdout.flush() 

152 continue 

153 elif user_input.lower().startswith("sql:"): 

154 # Direct SQL execution using the agent's session 

155 sql_query = user_input[4:].strip() 

156 logger.info(f"Executing SQL: {sql_query}") 

157 try: 

158 # Use the tool from the agent 

159 if hasattr(agent_wrapper.agent, "session") and agent_wrapper.agent.session: 

160 result = await agent_wrapper.agent.session.call_tool("query", {"query": sql_query}) 

161 logger.info(f"SQL result: {result.content}") 

162 # We need to show the result to the user 

163 sys.stdout.write(f"Result: {result.content}\n") 

164 sys.stdout.flush() 

165 else: 

166 logger.error("No active MCP session") 

167 sys.stdout.write("Error: No active MCP session\n") 

168 sys.stdout.flush() 

169 except Exception as e: 

170 logger.exception("SQL Error:") 

171 sys.stdout.write(f"SQL Error: {str(e)}\n") 

172 sys.stdout.flush() 

173 continue 

174 

175 messages.append({"role": "user", "content": user_input}) 

176 await run_conversation(agent_wrapper, messages, args.stream) 

177 

178 # Add assistant's response to the conversation history 

179 if not args.stream: 

180 response = await agent_wrapper.acompletion(messages) 

181 messages.append({"role": "assistant", "content": response["choices"][0]["message"]["content"]}) 

182 

183 # Clean up resources 

184 logger.info("Cleaning up resources") 

185 await agent_wrapper.cleanup() 

186 

187 except Exception: 

188 logger.exception("Error running MCP agent:") 

189 logger.info("Make sure the MindsDB server is running with MCP enabled: python -m mindsdb --api=mysql,mcp,http") 

190 return 1 

191 

192 return 0 

193 

194 

195if __name__ == "__main__": 

196 sys.exit(asyncio.run(main()))