Coverage for mindsdb / integrations / handlers / matrixone_handler / matrixone_handler.py: 0%
96 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
3import pandas as pd
4import pymysql as matone
5from pymysql.cursors import DictCursor as dict
6from mindsdb_sql_parser import parse_sql
7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
8from mindsdb_sql_parser.ast.base import ASTNode
10from mindsdb.utilities import log
11from mindsdb.integrations.libs.base import DatabaseHandler
12from mindsdb.integrations.libs.response import (
13 HandlerStatusResponse as StatusResponse,
14 HandlerResponse as Response,
15 RESPONSE_TYPE
16)
19logger = log.getLogger(__name__)
22class MatrixOneHandler(DatabaseHandler):
23 """
24 This handler handles connection and execution of the MatrixOne statements.
25 """
27 name = 'matrixone'
29 def __init__(self, name, connection_data: Optional[dict], **kwargs):
30 super().__init__(name)
31 self.mysql_url = None
32 self.parser = parse_sql
33 self.dialect = 'mysql'
34 self.connection_data = connection_data
35 self.database = self.connection_data.get('database')
37 self.connection = None
38 self.is_connected = False
40 def connect(self):
41 if self.is_connected is True:
42 return self.connection
44 config = {
45 'host': self.connection_data.get('host'),
46 'port': self.connection_data.get('port'),
47 'user': self.connection_data.get('user'),
48 'password': self.connection_data.get('password'),
49 'database': self.connection_data.get('database')
50 }
52 ssl = self.connection_data.get('ssl')
53 if ssl is True:
54 ssl_ca = self.connection_data.get('ssl_ca')
55 ssl_cert = self.connection_data.get('ssl_cert')
56 ssl_key = self.connection_data.get('ssl_key')
57 config['client_flags'] = [matone.constants.ClientFlag.SSL]
58 if ssl_ca is not None:
59 config["ssl_ca"] = ssl_ca
60 if ssl_cert is not None:
61 config["ssl_cert"] = ssl_cert
62 if ssl_key is not None:
63 config["ssl_key"] = ssl_key
65 connection = matone.connect(**config)
66 self.is_connected = True
67 self.connection = connection
68 return self.connection
70 def disconnect(self):
71 if self.is_connected is False:
72 return
73 self.connection.close()
74 self.is_connected = False
75 return
77 def check_connection(self) -> StatusResponse:
78 """
79 Check the connection of the MatrixOne database
80 :return: success status and error message if error occurs
81 """
83 result = StatusResponse(False)
84 need_to_close = self.is_connected is False
86 try:
87 connection = self.connect()
88 result.success = connection.open
89 except Exception as e:
90 logger.error(f'Error connecting to MatrixOne {self.connection_data["database"]}, {e}!')
91 result.error_message = str(e)
93 if result.success is True and need_to_close:
94 self.disconnect()
95 if result.success is False and self.is_connected is True:
96 self.is_connected = False
98 return result
100 def native_query(self, query: str) -> Response:
101 """
102 Receive SQL query and runs it
103 :param query: The SQL query to run in MatrixOne
104 :return: returns the records from the current recordset
105 """
107 need_to_close = self.is_connected is False
109 connection = self.connect()
110 with connection.cursor(cursor=dict) as cur:
111 try:
112 cur.execute(query)
113 if cur._rows:
114 result = cur.fetchall()
115 response = Response(
116 RESPONSE_TYPE.TABLE,
117 pd.DataFrame(
118 result,
119 # columns=[x[0] for x in cur.description]
120 )
121 )
122 else:
123 response = Response(RESPONSE_TYPE.OK)
124 connection.commit()
125 except Exception as e:
126 logger.error(f'Error running query: {query} on {self.connection_data["database"]}!')
127 response = Response(
128 RESPONSE_TYPE.ERROR,
129 error_message=str(e)
130 )
131 connection.rollback()
133 if need_to_close is True:
134 self.disconnect()
136 return response
138 def query(self, query: ASTNode) -> Response:
139 """
140 Retrieve the data from the SQL statement.
141 """
142 renderer = SqlalchemyRender('mysql')
143 query_str = renderer.get_string(query, with_failback=True)
144 return self.native_query(query_str)
146 def get_tables(self) -> Response:
147 """
148 Get a list with all of the tabels in MatrixOne
149 """
150 q = "SHOW TABLES;"
151 result = self.native_query(q)
152 df = result.data_frame
153 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'})
154 return result
156 def get_columns(self, table_name) -> Response:
157 """
158 Show details about the table
159 """
160 q = f"SHOW COLUMNS FROM {table_name};"
161 result = self.native_query(q)
162 df = result.data_frame
163 result.data_frame = df.rename(columns={
164 df.columns[0]: 'COLUMN_NAME',
165 df.columns[1]: 'DATA TYPE'
166 })
168 return result