Coverage for mindsdb / interfaces / agents / run_mcp_agent.py: 0%
132 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import sys
2import argparse
3import asyncio
4from typing import List, Dict
5from contextlib import AsyncExitStack
7from mcp import ClientSession, StdioServerParameters
8from mcp.client.stdio import stdio_client
10from mindsdb.utilities import log
11from mindsdb.interfaces.agents.mcp_client_agent import create_mcp_agent
13logger = log.getLogger(__name__)
16async def run_conversation(agent_wrapper, messages: List[Dict[str, str]], stream: bool = False):
17 """Run a conversation with the agent and print responses"""
18 try:
19 if stream:
20 logger.info("Streaming response:")
21 async for chunk in agent_wrapper.acompletion_stream(messages):
22 content = chunk["choices"][0]["delta"].get("content", "")
23 if content:
24 # We still need to print content for streaming display
25 # but we'll log it as debug as well
26 logger.debug(f"Stream content: {content}")
27 sys.stdout.write(content)
28 sys.stdout.flush()
29 logger.debug("End of stream")
30 sys.stdout.write("\n\n")
31 sys.stdout.flush()
32 else:
33 logger.info("Getting response...")
34 response = await agent_wrapper.acompletion(messages)
35 content = response["choices"][0]["message"]["content"]
36 logger.info(f"Response: {content}")
37 # We still need to display the response to the user
38 sys.stdout.write(f"{content}\n")
39 sys.stdout.flush()
40 except Exception:
41 logger.exception("Error during agent conversation:")
44async def execute_direct_query(query):
45 """Execute a direct SQL query using MCP"""
46 logger.info(f"Executing direct SQL query: {query}")
48 # Set up MCP client to connect to the running server
49 async with AsyncExitStack() as stack:
50 # Connect to MCP server
51 server_params = StdioServerParameters(command="python", args=["-m", "mindsdb", "--api=mcp"], env=None)
53 try:
54 stdio_transport = await stack.enter_async_context(stdio_client(server_params))
55 stdio, write = stdio_transport
56 session = await stack.enter_async_context(ClientSession(stdio, write))
58 await session.initialize()
60 # List available tools
61 tools_response = await session.list_tools()
62 tool_names = [tool.name for tool in tools_response.tools]
63 logger.info(f"Available tools: {tool_names}")
65 # Find query tool
66 query_tool = None
67 for tool in tools_response.tools:
68 if tool.name == "query":
69 query_tool = tool
70 break
72 if not query_tool:
73 logger.error("No 'query' tool found in MCP server")
74 return
76 # Execute query
77 result = await session.call_tool("query", {"query": query})
78 logger.info(f"Query result: {result.content}")
79 except Exception:
80 logger.exception("Error executing query:")
81 logger.info("Make sure the MindsDB server is running with HTTP enabled: python -m mindsdb --api=http")
84async def main():
85 parser = argparse.ArgumentParser(description="Run an agent as an MCP client")
86 parser.add_argument("--agent", type=str, help="Name of the agent to use")
87 parser.add_argument("--project", type=str, default="mindsdb", help="Project containing the agent")
88 parser.add_argument("--host", type=str, default="127.0.0.1", help="MCP server host")
89 parser.add_argument("--port", type=int, default=47337, help="MCP server port")
90 parser.add_argument("--query", type=str, help="Query to send to the agent")
91 parser.add_argument("--stream", action="store_true", help="Stream the response")
92 parser.add_argument("--execute-direct", type=str, help="Execute a direct SQL query via MCP (for testing)")
94 args = parser.parse_args()
96 try:
97 # Initialize database connection
98 from mindsdb.interfaces.storage import db
100 db.init()
102 # Direct SQL execution mode (for testing MCP connection)
103 if args.execute_direct:
104 await execute_direct_query(args.execute_direct)
105 return 0
107 # Make sure agent name is provided
108 if not args.agent:
109 parser.error("the --agent argument is required unless --execute-direct is used")
111 # Create the agent
112 logger.info(f"Creating MCP client agent for '{args.agent}' in project '{args.project}'")
113 logger.info(f"Connecting to MCP server at {args.host}:{args.port}")
114 logger.info("Make sure MindsDB server is running with MCP enabled: python -m mindsdb --api=mysql,mcp,http")
116 agent_wrapper = create_mcp_agent(
117 agent_name=args.agent, project_name=args.project, mcp_host=args.host, mcp_port=args.port
118 )
120 # Run an example query if provided
121 if args.query:
122 messages = [{"role": "user", "content": args.query}]
123 await run_conversation(agent_wrapper, messages, args.stream)
124 else:
125 # Interactive mode
126 logger.info("Entering interactive mode. Type 'exit' to quit.")
127 logger.info("Available commands: exit/quit, clear, sql:")
129 # We still need to show these instructions to the user
130 sys.stdout.write("\nEntering interactive mode. Type 'exit' to quit.\n")
131 sys.stdout.write("\nAvailable commands:\n")
132 sys.stdout.write(" exit, quit - Exit the program\n")
133 sys.stdout.write(" clear - Clear conversation history\n")
134 sys.stdout.write(" sql: <query> - Execute a direct SQL query via MCP\n")
135 sys.stdout.flush()
137 messages = []
139 while True:
140 # We need to keep input for user interaction
141 user_input = input("\nYou: ")
143 # Check for special commands
144 if user_input.lower() in ["exit", "quit"]:
145 logger.info("Exiting interactive mode")
146 break
147 elif user_input.lower() == "clear":
148 messages = []
149 logger.info("Conversation history cleared")
150 sys.stdout.write("Conversation history cleared\n")
151 sys.stdout.flush()
152 continue
153 elif user_input.lower().startswith("sql:"):
154 # Direct SQL execution using the agent's session
155 sql_query = user_input[4:].strip()
156 logger.info(f"Executing SQL: {sql_query}")
157 try:
158 # Use the tool from the agent
159 if hasattr(agent_wrapper.agent, "session") and agent_wrapper.agent.session:
160 result = await agent_wrapper.agent.session.call_tool("query", {"query": sql_query})
161 logger.info(f"SQL result: {result.content}")
162 # We need to show the result to the user
163 sys.stdout.write(f"Result: {result.content}\n")
164 sys.stdout.flush()
165 else:
166 logger.error("No active MCP session")
167 sys.stdout.write("Error: No active MCP session\n")
168 sys.stdout.flush()
169 except Exception as e:
170 logger.exception("SQL Error:")
171 sys.stdout.write(f"SQL Error: {str(e)}\n")
172 sys.stdout.flush()
173 continue
175 messages.append({"role": "user", "content": user_input})
176 await run_conversation(agent_wrapper, messages, args.stream)
178 # Add assistant's response to the conversation history
179 if not args.stream:
180 response = await agent_wrapper.acompletion(messages)
181 messages.append({"role": "assistant", "content": response["choices"][0]["message"]["content"]})
183 # Clean up resources
184 logger.info("Cleaning up resources")
185 await agent_wrapper.cleanup()
187 except Exception:
188 logger.exception("Error running MCP agent:")
189 logger.info("Make sure the MindsDB server is running with MCP enabled: python -m mindsdb --api=mysql,mcp,http")
190 return 1
192 return 0
195if __name__ == "__main__":
196 sys.exit(asyncio.run(main()))