Coverage for mindsdb / integrations / handlers / matrixone_handler / matrixone_handler.py: 0%

96 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2 

3import pandas as pd 

4import pymysql as matone 

5from pymysql.cursors import DictCursor as dict 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

8from mindsdb_sql_parser.ast.base import ASTNode 

9 

10from mindsdb.utilities import log 

11from mindsdb.integrations.libs.base import DatabaseHandler 

12from mindsdb.integrations.libs.response import ( 

13 HandlerStatusResponse as StatusResponse, 

14 HandlerResponse as Response, 

15 RESPONSE_TYPE 

16) 

17 

18 

19logger = log.getLogger(__name__) 

20 

21 

22class MatrixOneHandler(DatabaseHandler): 

23 """ 

24 This handler handles connection and execution of the MatrixOne statements. 

25 """ 

26 

27 name = 'matrixone' 

28 

29 def __init__(self, name, connection_data: Optional[dict], **kwargs): 

30 super().__init__(name) 

31 self.mysql_url = None 

32 self.parser = parse_sql 

33 self.dialect = 'mysql' 

34 self.connection_data = connection_data 

35 self.database = self.connection_data.get('database') 

36 

37 self.connection = None 

38 self.is_connected = False 

39 

40 def connect(self): 

41 if self.is_connected is True: 

42 return self.connection 

43 

44 config = { 

45 'host': self.connection_data.get('host'), 

46 'port': self.connection_data.get('port'), 

47 'user': self.connection_data.get('user'), 

48 'password': self.connection_data.get('password'), 

49 'database': self.connection_data.get('database') 

50 } 

51 

52 ssl = self.connection_data.get('ssl') 

53 if ssl is True: 

54 ssl_ca = self.connection_data.get('ssl_ca') 

55 ssl_cert = self.connection_data.get('ssl_cert') 

56 ssl_key = self.connection_data.get('ssl_key') 

57 config['client_flags'] = [matone.constants.ClientFlag.SSL] 

58 if ssl_ca is not None: 

59 config["ssl_ca"] = ssl_ca 

60 if ssl_cert is not None: 

61 config["ssl_cert"] = ssl_cert 

62 if ssl_key is not None: 

63 config["ssl_key"] = ssl_key 

64 

65 connection = matone.connect(**config) 

66 self.is_connected = True 

67 self.connection = connection 

68 return self.connection 

69 

70 def disconnect(self): 

71 if self.is_connected is False: 

72 return 

73 self.connection.close() 

74 self.is_connected = False 

75 return 

76 

77 def check_connection(self) -> StatusResponse: 

78 """ 

79 Check the connection of the MatrixOne database 

80 :return: success status and error message if error occurs 

81 """ 

82 

83 result = StatusResponse(False) 

84 need_to_close = self.is_connected is False 

85 

86 try: 

87 connection = self.connect() 

88 result.success = connection.open 

89 except Exception as e: 

90 logger.error(f'Error connecting to MatrixOne {self.connection_data["database"]}, {e}!') 

91 result.error_message = str(e) 

92 

93 if result.success is True and need_to_close: 

94 self.disconnect() 

95 if result.success is False and self.is_connected is True: 

96 self.is_connected = False 

97 

98 return result 

99 

100 def native_query(self, query: str) -> Response: 

101 """ 

102 Receive SQL query and runs it 

103 :param query: The SQL query to run in MatrixOne 

104 :return: returns the records from the current recordset 

105 """ 

106 

107 need_to_close = self.is_connected is False 

108 

109 connection = self.connect() 

110 with connection.cursor(cursor=dict) as cur: 

111 try: 

112 cur.execute(query) 

113 if cur._rows: 

114 result = cur.fetchall() 

115 response = Response( 

116 RESPONSE_TYPE.TABLE, 

117 pd.DataFrame( 

118 result, 

119 # columns=[x[0] for x in cur.description] 

120 ) 

121 ) 

122 else: 

123 response = Response(RESPONSE_TYPE.OK) 

124 connection.commit() 

125 except Exception as e: 

126 logger.error(f'Error running query: {query} on {self.connection_data["database"]}!') 

127 response = Response( 

128 RESPONSE_TYPE.ERROR, 

129 error_message=str(e) 

130 ) 

131 connection.rollback() 

132 

133 if need_to_close is True: 

134 self.disconnect() 

135 

136 return response 

137 

138 def query(self, query: ASTNode) -> Response: 

139 """ 

140 Retrieve the data from the SQL statement. 

141 """ 

142 renderer = SqlalchemyRender('mysql') 

143 query_str = renderer.get_string(query, with_failback=True) 

144 return self.native_query(query_str) 

145 

146 def get_tables(self) -> Response: 

147 """ 

148 Get a list with all of the tabels in MatrixOne 

149 """ 

150 q = "SHOW TABLES;" 

151 result = self.native_query(q) 

152 df = result.data_frame 

153 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'}) 

154 return result 

155 

156 def get_columns(self, table_name) -> Response: 

157 """ 

158 Show details about the table 

159 """ 

160 q = f"SHOW COLUMNS FROM {table_name};" 

161 result = self.native_query(q) 

162 df = result.data_frame 

163 result.data_frame = df.rename(columns={ 

164 df.columns[0]: 'COLUMN_NAME', 

165 df.columns[1]: 'DATA TYPE' 

166 }) 

167 

168 return result