Coverage for mindsdb / integrations / handlers / monetdb_handler / monetdb_handler.py: 0%
91 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
2from mindsdb_sql_parser.ast.base import ASTNode
3from mindsdb.integrations.libs.base import DatabaseHandler
4from mindsdb.utilities import log
5from mindsdb_sql_parser import parse_sql
6from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
7from mindsdb.integrations.libs.response import (
8 HandlerStatusResponse as StatusResponse,
9 HandlerResponse as Response,
10 RESPONSE_TYPE,
11)
13import pandas as pd
14import pymonetdb as mdb
15from .utils.monet_get_id import schema_id, table_id
16from sqlalchemy_monetdb.dialect import MonetDialect
18logger = log.getLogger(__name__)
21class MonetDBHandler(DatabaseHandler):
22 name = "monetdb"
24 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
25 """Initialize the handler
26 Args:
27 name (str): name of particular handler instance
28 connection_data (dict): parameters for connecting to the database
29 **kwargs: arbitrary keyword arguments.
30 """
31 super().__init__(name)
33 self.kwargs = kwargs
34 self.parser = parse_sql
35 self.database = connection_data["database"]
36 self.user = connection_data["user"]
37 self.password = connection_data["password"]
38 self.schemaName = connection_data["schema_name"] if "schema_name" in connection_data else None
39 self.host = connection_data["host"]
40 self.port = connection_data["port"]
42 self.connection = None
43 self.is_connected = False
45 def connect(self):
46 """Set up any connections required by the handler
47 Should return output of check_connection() method after attempting
48 connection. Should switch self.is_connected.
49 Returns:
50 Connection Object
51 """
52 if self.is_connected is True:
53 return self.connection
55 try:
56 self.connection = mdb.connect(
57 database=self.database,
58 hostname=self.host,
59 port=self.port,
60 username=self.user,
61 password=self.password,
62 )
64 self.is_connected = True
65 except Exception as e:
66 logger.error(f"Error while connecting to {self.database}, {e}")
68 return self.connection
70 def disconnect(self):
71 """Close any existing connections
72 Should switch self.is_connected.
73 """
74 if self.is_connected is False:
75 return
76 try:
77 self.connection.close()
78 self.is_connected = False
79 except Exception as e:
80 logger.error(f"Error while disconnecting to {self.database}, {e}")
82 return
84 def check_connection(self) -> StatusResponse:
85 """Check connection to the handler
86 Returns:
87 HandlerStatusResponse
88 """
89 responseCode = StatusResponse(False)
90 need_to_close = self.is_connected is False
92 try:
93 self.connect()
94 responseCode.success = True
95 except Exception as e:
96 logger.error(f"Error connecting to database {self.database}, {e}!")
97 responseCode.error_message = str(e)
98 finally:
99 if responseCode.success is True and need_to_close:
100 self.disconnect()
101 if responseCode.success is False and self.is_connected is True:
102 self.is_connected = False
104 return responseCode
106 def native_query(self, query: str) -> StatusResponse:
107 """Receive raw query and act upon it somehow.
108 Args:
109 query (Any): query in native format (str for sql databases,
110 etc)
111 Returns:
112 HandlerResponse
113 """
114 need_to_close = self.is_connected is False
115 conn = self.connect()
116 cur = conn.cursor()
117 try:
118 cur.execute(query)
120 if len(cur._rows) > 0:
121 result = cur.fetchall()
122 response = Response(
123 RESPONSE_TYPE.TABLE,
124 data_frame=pd.DataFrame(result, columns=[x[0] for x in cur.description]),
125 )
126 else:
127 response = Response(RESPONSE_TYPE.OK)
128 self.connection.commit()
129 except Exception as e:
130 logger.error(f"Error running query: {query} on {self.database}!")
131 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e))
132 self.connection.rollback()
134 cur.close()
136 if need_to_close is True:
137 self.disconnect()
139 return response
141 def query(self, query: ASTNode) -> StatusResponse:
142 """Receive query as AST (abstract syntax tree) and act upon it somehow.
143 Args:
144 query (ASTNode): sql query represented as AST. May be any kind
145 of query: SELECT, INTSERT, DELETE, etc
146 Returns: HandlerResponse
147 """
149 renderer = SqlalchemyRender(MonetDialect)
150 query_str = renderer.get_string(query, with_failback=True)
151 return self.native_query(query_str)
153 def get_tables(self) -> StatusResponse:
154 """Return list of entities
155 Return list of entities that will be accesible as tables.
156 Returns: HandlerResponse: shoud have same columns as information_schema.tables
157 (https://dev.mysql.com/doc/refman/8.0/en/information-schema-tables-table.html)
158 Column 'TABLE_NAME' is mandatory, other is optional.
159 """
160 self.connect()
161 schema = schema_id(connection=self.connection, schema_name=self.schemaName)
163 q = f"""
164 SELECT name as TABLE_NAME
165 FROM sys.tables
166 WHERE system = False
167 AND type = 0
168 AND schema_id = {schema}
169 """
171 return self.query(q)
173 def get_columns(self, table_name: str) -> StatusResponse:
174 """Returns a list of entity columns
175 Args:
176 table_name (str): name of one of tables returned by self.get_tables()
177 Returns:
178 HandlerResponse: shoud have same columns as information_schema.columns
179 (https://dev.mysql.com/doc/refman/8.0/en/information-schema-columns-table.html)
180 Column 'COLUMN_NAME' is mandatory, other is optional. Hightly
181 recomended to define also 'DATA_TYPE': it should be one of
182 python data types (by default it str).
183 """
184 self.connect()
185 table = table_id(
186 connection=self.connection,
187 table_name=table_name,
188 schema_name=self.schemaName,
189 )
191 q = f"""
192 SELECT
193 name as COLUMN_NAME,
194 type as DATA_TYPE
195 FROM sys.columns
196 WHERE table_id = {table}
197 """
198 return self.query(q)