Coverage for mindsdb / api / mcp / __init__.py: 0%

66 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from textwrap import dedent 

2from typing import Any 

3from contextlib import asynccontextmanager 

4from collections.abc import AsyncIterator 

5from dataclasses import dataclass 

6 

7from mcp.server.fastmcp import FastMCP 

8from starlette.requests import Request 

9from starlette.responses import JSONResponse 

10 

11from mindsdb.api.mysql.mysql_proxy.classes.fake_mysql_proxy import FakeMysqlProxy 

12from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE as SQL_RESPONSE_TYPE 

13from mindsdb.interfaces.storage import db 

14from mindsdb.utilities import log 

15 

16logger = log.getLogger(__name__) 

17 

18 

19@dataclass 

20class AppContext: 

21 db: Any 

22 

23 

24@asynccontextmanager 

25async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: 

26 """Manage application lifecycle with type-safe context""" 

27 # Initialize on startup 

28 db.init() 

29 try: 

30 yield AppContext(db=db) 

31 finally: 

32 # TODO: We need better way to handle this in storage/db.py 

33 pass 

34 

35 

36# Configure server with lifespan 

37mcp = FastMCP( 

38 "MindsDB", 

39 lifespan=app_lifespan, 

40 dependencies=["mindsdb"], # Add any additional dependencies 

41) 

42 

43 

44# MCP Queries 

45LISTING_QUERY = "SHOW DATABASES" 

46 

47 

48query_tool_description = dedent("""\ 

49 Executes a SQL query against MindsDB. 

50 

51 A database must be specified either in the `context` parameter or directly in the query string (e.g., `SELECT * FROM my_database.my_table`). Queries like `SELECT * FROM my_table` will fail without a `context`. 

52 

53 Args: 

54 query (str): The SQL query to execute. 

55 context (dict, optional): The default database context. For example, `{"db": "my_postgres"}`. 

56 

57 Returns: 

58 A dictionary describing the result. 

59 - For a successful query with no data to return (e.g., an `UPDATE` statement), the response is `{"type": "ok"}`. 

60 - If the query returns tabular data, the response is a dictionary containing `data` (a list of rows) and `column_names` (a list of column names). For example: `{"type": "table", "data": [[1, "a"], [2, "b"]], "column_names": ["column_a", "column_b"]}`. 

61 - In case of an error, a response is `{"type": "error", "error_message": "the error message"}`. 

62""") 

63 

64 

65@mcp.tool(name="query", description=query_tool_description) 

66def query(query: str, context: dict | None = None) -> dict[str, Any]: 

67 """Execute a SQL query against MindsDB 

68 

69 Args: 

70 query: The SQL query to execute 

71 context: Optional context parameters for the query 

72 

73 Returns: 

74 Dict containing the query results or error information 

75 """ 

76 

77 if context is None: 

78 context = {} 

79 

80 logger.debug(f"Incoming MCP query: {query}") 

81 

82 mysql_proxy = FakeMysqlProxy() 

83 mysql_proxy.set_context(context) 

84 

85 try: 

86 result = mysql_proxy.process_query(query) 

87 

88 if result.type == SQL_RESPONSE_TYPE.OK: 

89 return {"type": SQL_RESPONSE_TYPE.OK} 

90 

91 if result.type == SQL_RESPONSE_TYPE.TABLE: 

92 return { 

93 "type": SQL_RESPONSE_TYPE.TABLE, 

94 "data": result.result_set.to_lists(json_types=True), 

95 "column_names": [column.alias or column.name for column in result.result_set.columns], 

96 } 

97 else: 

98 return {"type": SQL_RESPONSE_TYPE.ERROR, "error_code": 0, "error_message": "Unknown response type"} 

99 

100 except Exception as e: 

101 logger.exception("Error processing query:") 

102 return {"type": SQL_RESPONSE_TYPE.ERROR, "error_code": 0, "error_message": str(e)} 

103 

104 

105list_databases_tool_description = ( 

106 "Returns a list of all database connections currently available in MindsDB. " 

107 + "The tool takes no parameters and responds with a list of database names, " 

108 + 'for example: ["my_postgres", "my_mysql", "test_db"].' 

109) 

110 

111 

112@mcp.tool(name="list_databases", description=list_databases_tool_description) 

113def list_databases() -> list[str]: 

114 """ 

115 List all databases in MindsDB 

116 

117 Returns: 

118 list[str]: list of databases 

119 """ 

120 

121 mysql_proxy = FakeMysqlProxy() 

122 

123 try: 

124 result = mysql_proxy.process_query(LISTING_QUERY) 

125 if result.type == SQL_RESPONSE_TYPE.ERROR: 

126 return { 

127 "type": "error", 

128 "error_code": result.error_code, 

129 "error_message": result.error_message, 

130 } 

131 

132 elif result.type == SQL_RESPONSE_TYPE.OK: 

133 return {"type": "ok"} 

134 

135 elif result.type == SQL_RESPONSE_TYPE.TABLE: 

136 data = result.result_set.to_lists(json_types=True) 

137 data = [val[0] for val in data] 

138 return data 

139 

140 except Exception as e: 

141 logger.exception("Error while retrieving list of databases") 

142 return { 

143 "type": "error", 

144 "error_code": 0, 

145 "error_message": str(e), 

146 } 

147 

148 

149def _get_status(request: Request) -> JSONResponse: 

150 """ 

151 Status endpoint that returns basic server information. 

152 This endpoint can be used by the frontend to check if the MCP server is running. 

153 """ 

154 

155 status_info = { 

156 "status": "ok", 

157 "service": "mindsdb-mcp", 

158 } 

159 

160 return JSONResponse(status_info) 

161 

162 

163def get_mcp_app(): 

164 app = mcp.sse_app() 

165 app.add_route("/status", _get_status, methods=["GET"]) 

166 return app