Coverage for mindsdb / integrations / handlers / monetdb_handler / monetdb_handler.py: 0%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2from mindsdb_sql_parser.ast.base import ASTNode 

3from mindsdb.integrations.libs.base import DatabaseHandler 

4from mindsdb.utilities import log 

5from mindsdb_sql_parser import parse_sql 

6from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

7from mindsdb.integrations.libs.response import ( 

8 HandlerStatusResponse as StatusResponse, 

9 HandlerResponse as Response, 

10 RESPONSE_TYPE, 

11) 

12 

13import pandas as pd 

14import pymonetdb as mdb 

15from .utils.monet_get_id import schema_id, table_id 

16from sqlalchemy_monetdb.dialect import MonetDialect 

17 

18logger = log.getLogger(__name__) 

19 

20 

21class MonetDBHandler(DatabaseHandler): 

22 name = "monetdb" 

23 

24 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

25 """Initialize the handler 

26 Args: 

27 name (str): name of particular handler instance 

28 connection_data (dict): parameters for connecting to the database 

29 **kwargs: arbitrary keyword arguments. 

30 """ 

31 super().__init__(name) 

32 

33 self.kwargs = kwargs 

34 self.parser = parse_sql 

35 self.database = connection_data["database"] 

36 self.user = connection_data["user"] 

37 self.password = connection_data["password"] 

38 self.schemaName = connection_data["schema_name"] if "schema_name" in connection_data else None 

39 self.host = connection_data["host"] 

40 self.port = connection_data["port"] 

41 

42 self.connection = None 

43 self.is_connected = False 

44 

45 def connect(self): 

46 """Set up any connections required by the handler 

47 Should return output of check_connection() method after attempting 

48 connection. Should switch self.is_connected. 

49 Returns: 

50 Connection Object 

51 """ 

52 if self.is_connected is True: 

53 return self.connection 

54 

55 try: 

56 self.connection = mdb.connect( 

57 database=self.database, 

58 hostname=self.host, 

59 port=self.port, 

60 username=self.user, 

61 password=self.password, 

62 ) 

63 

64 self.is_connected = True 

65 except Exception as e: 

66 logger.error(f"Error while connecting to {self.database}, {e}") 

67 

68 return self.connection 

69 

70 def disconnect(self): 

71 """Close any existing connections 

72 Should switch self.is_connected. 

73 """ 

74 if self.is_connected is False: 

75 return 

76 try: 

77 self.connection.close() 

78 self.is_connected = False 

79 except Exception as e: 

80 logger.error(f"Error while disconnecting to {self.database}, {e}") 

81 

82 return 

83 

84 def check_connection(self) -> StatusResponse: 

85 """Check connection to the handler 

86 Returns: 

87 HandlerStatusResponse 

88 """ 

89 responseCode = StatusResponse(False) 

90 need_to_close = self.is_connected is False 

91 

92 try: 

93 self.connect() 

94 responseCode.success = True 

95 except Exception as e: 

96 logger.error(f"Error connecting to database {self.database}, {e}!") 

97 responseCode.error_message = str(e) 

98 finally: 

99 if responseCode.success is True and need_to_close: 

100 self.disconnect() 

101 if responseCode.success is False and self.is_connected is True: 

102 self.is_connected = False 

103 

104 return responseCode 

105 

106 def native_query(self, query: str) -> StatusResponse: 

107 """Receive raw query and act upon it somehow. 

108 Args: 

109 query (Any): query in native format (str for sql databases, 

110 etc) 

111 Returns: 

112 HandlerResponse 

113 """ 

114 need_to_close = self.is_connected is False 

115 conn = self.connect() 

116 cur = conn.cursor() 

117 try: 

118 cur.execute(query) 

119 

120 if len(cur._rows) > 0: 

121 result = cur.fetchall() 

122 response = Response( 

123 RESPONSE_TYPE.TABLE, 

124 data_frame=pd.DataFrame(result, columns=[x[0] for x in cur.description]), 

125 ) 

126 else: 

127 response = Response(RESPONSE_TYPE.OK) 

128 self.connection.commit() 

129 except Exception as e: 

130 logger.error(f"Error running query: {query} on {self.database}!") 

131 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e)) 

132 self.connection.rollback() 

133 

134 cur.close() 

135 

136 if need_to_close is True: 

137 self.disconnect() 

138 

139 return response 

140 

141 def query(self, query: ASTNode) -> StatusResponse: 

142 """Receive query as AST (abstract syntax tree) and act upon it somehow. 

143 Args: 

144 query (ASTNode): sql query represented as AST. May be any kind 

145 of query: SELECT, INTSERT, DELETE, etc 

146 Returns: HandlerResponse 

147 """ 

148 

149 renderer = SqlalchemyRender(MonetDialect) 

150 query_str = renderer.get_string(query, with_failback=True) 

151 return self.native_query(query_str) 

152 

153 def get_tables(self) -> StatusResponse: 

154 """Return list of entities 

155 Return list of entities that will be accesible as tables. 

156 Returns: HandlerResponse: shoud have same columns as information_schema.tables 

157 (https://dev.mysql.com/doc/refman/8.0/en/information-schema-tables-table.html) 

158 Column 'TABLE_NAME' is mandatory, other is optional. 

159 """ 

160 self.connect() 

161 schema = schema_id(connection=self.connection, schema_name=self.schemaName) 

162 

163 q = f""" 

164 SELECT name as TABLE_NAME 

165 FROM sys.tables 

166 WHERE system = False 

167 AND type = 0 

168 AND schema_id = {schema} 

169 """ 

170 

171 return self.query(q) 

172 

173 def get_columns(self, table_name: str) -> StatusResponse: 

174 """Returns a list of entity columns 

175 Args: 

176 table_name (str): name of one of tables returned by self.get_tables() 

177 Returns: 

178 HandlerResponse: shoud have same columns as information_schema.columns 

179 (https://dev.mysql.com/doc/refman/8.0/en/information-schema-columns-table.html) 

180 Column 'COLUMN_NAME' is mandatory, other is optional. Hightly 

181 recomended to define also 'DATA_TYPE': it should be one of 

182 python data types (by default it str). 

183 """ 

184 self.connect() 

185 table = table_id( 

186 connection=self.connection, 

187 table_name=table_name, 

188 schema_name=self.schemaName, 

189 ) 

190 

191 q = f""" 

192 SELECT 

193 name as COLUMN_NAME, 

194 type as DATA_TYPE 

195 FROM sys.columns 

196 WHERE table_id = {table} 

197 """ 

198 return self.query(q)