Coverage for mindsdb / integrations / handlers / altibase_handler / altibase_handler.py: 0%

121 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2 

3import jaydebeapi as jdbcconnector 

4from mindsdb_sql_parser import parse_sql 

5from mindsdb_sql_parser.ast.base import ASTNode 

6import pandas as pd 

7import pyodbc 

8import numpy as np 

9 

10from mindsdb.integrations.libs.base import DatabaseHandler 

11from mindsdb.integrations.libs.response import ( 

12 HandlerStatusResponse as StatusResponse, 

13 HandlerResponse as Response, 

14 RESPONSE_TYPE 

15) 

16from mindsdb.utilities import log 

17 

18logger = log.getLogger(__name__) 

19 

20 

21class AltibaseHandler(DatabaseHandler): 

22 """ 

23 This handler handles connection and execution of the Altibase statements. 

24 """ 

25 

26 name = 'altibase' 

27 

28 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

29 """ constructor 

30 Args: 

31 name (str): name of particular handler instance 

32 connection_data (dict): parameters for connecting to the database 

33 """ 

34 super().__init__(name) 

35 

36 self.parser = parse_sql 

37 

38 self.connection_args = connection_data 

39 self.database = self.connection_args.get('database') 

40 self.host = self.connection_args.get('host') 

41 self.port = self.connection_args.get('port') 

42 self.user = self.connection_args.get('user') 

43 self.password = self.connection_args.get('password') 

44 self.dsn = self.connection_args.get('dsn') 

45 

46 self.connection = None 

47 self.is_connected = False 

48 

49 def connect(self): 

50 """ Set up any connections required by the handler 

51 Should return output of check_connection() method after attempting connection. 

52 Should switch self.is_connected. 

53 Returns: 

54 connection 

55 """ 

56 if self.is_connected is True: 

57 return self.connection 

58 

59 if self.dsn: 

60 return self.connect_with_odbc() 

61 else: 

62 return self.connect_with_jdbc() 

63 

64 def connect_with_odbc(self): 

65 """ Set up any connections required by the handler 

66 Should return output of check_connection() method after attempting connection. 

67 Should switch self.is_connected. 

68 Returns: 

69 connection 

70 """ 

71 conn_str = [f"DSN={self.dsn}"] 

72 

73 if self.host: 

74 conn_str.append(f"Server={self.host}") 

75 if self.port: 

76 conn_str.append(f"Port={self.port}") 

77 if self.user: 

78 conn_str.append(f"User={self.user}") 

79 if self.password: 

80 conn_str.append(f"Password={self.password}") 

81 

82 conn_str = ';'.join(conn_str) 

83 

84 try: 

85 self.connection = pyodbc.connect(conn_str, timeout=10) 

86 self.is_connected = True 

87 except Exception as e: 

88 logger.error(f"Error while connecting to {self.database}, {e}") 

89 

90 return self.connection 

91 

92 def connect_with_jdbc(self): 

93 """ Set up any connections required by the handler 

94 Should return output of check_connection() method after attempting connection. 

95 Should switch self.is_connected. 

96 Returns: 

97 connection 

98 """ 

99 jar_location = self.connection_args.get('jar_location') 

100 

101 jdbc_class = self.connection_args.get('jdbc_class', 'Altibase.jdbc.driver.AltibaseDriver') 

102 jdbc_url = f"jdbc:Altibase://{self.host}:{self.port}/{self.database}" 

103 

104 try: 

105 if self.user and self.password and jar_location: 

106 connection = jdbcconnector.connect(jclassname=jdbc_class, url=jdbc_url, driver_args=[self.user, self.password], jars=str(jar_location).split(",")) 

107 elif self.user and self.password: 

108 connection = jdbcconnector.connect(jclassname=jdbc_class, url=jdbc_url, driver_args=[self.user, self.password]) 

109 elif jar_location: 

110 connection = jdbcconnector.connect(jclassname=jdbc_class, url=jdbc_url, jars=jar_location.split(",")) 

111 else: 

112 connection = jdbcconnector.connect(jclassname=jdbc_class, url=jdbc_url) 

113 

114 self.connection = connection 

115 self.is_connected = True 

116 except Exception as e: 

117 logger.error(f"Error while connecting to {self.database}, {e}") 

118 raise e 

119 

120 return self.connection 

121 

122 def disconnect(self): 

123 """ Close any existing connections 

124 Should switch self.is_connected. 

125 """ 

126 if self.is_connected is True: 

127 try: 

128 self.connection.close() 

129 self.is_connected = False 

130 except Exception as e: 

131 logger.error(f"Error while disconnecting to {self.database}, {e}") 

132 return False 

133 return True 

134 

135 def check_connection(self) -> StatusResponse: 

136 """ Check connection to the handler 

137 Returns: 

138 HandlerStatusResponse 

139 """ 

140 responseCode = StatusResponse(success=False) 

141 need_to_close = self.is_connected is False 

142 

143 try: 

144 self.connect() 

145 responseCode.success = True 

146 except Exception as e: 

147 logger.error(f'Error connecting to database {self.database}, {e}!') 

148 responseCode.error_message = str(e) 

149 finally: 

150 if responseCode.success and need_to_close: 

151 self.disconnect() 

152 if not responseCode.success and self.is_connected: 

153 self.is_connected = False 

154 

155 return responseCode 

156 

157 def native_query(self, query: str) -> Response: 

158 """Receive raw query and act upon it somehow. 

159 Args: 

160 query (str): query in native format 

161 Returns: 

162 HandlerResponse 

163 """ 

164 need_to_close = self.is_connected is False 

165 connection = self.connect() 

166 with connection.cursor() as cur: 

167 try: 

168 cur.execute(query) 

169 if cur.description: 

170 result = cur.fetchall() 

171 

172 if self.dsn: 

173 if len(result) > 0: 

174 result = np.array(result) 

175 

176 response = Response( 

177 RESPONSE_TYPE.TABLE, 

178 data_frame=pd.DataFrame( 

179 result, 

180 columns=[x[0] for x in cur.description] 

181 ) 

182 ) 

183 else: 

184 response = Response(RESPONSE_TYPE.OK) 

185 connection.commit() 

186 except Exception as e: 

187 logger.error(f'Error running query: {query} on {self.database}!') 

188 response = Response( 

189 RESPONSE_TYPE.ERROR, 

190 error_message=str(e) 

191 ) 

192 connection.rollback() 

193 

194 if need_to_close is True: 

195 self.disconnect() 

196 

197 return response 

198 

199 def query(self, query: ASTNode) -> Response: 

200 """Receive query as AST (abstract syntax tree) and act upon it somehow. 

201 Args: 

202 query (ASTNode): sql query represented as AST. May be any kind of query: SELECT, INSERT, DELETE, etc 

203 Returns: 

204 HandlerResponse 

205 """ 

206 if isinstance(query, ASTNode): 

207 query_str = query.to_string() 

208 else: 

209 query_str = str(query) 

210 

211 return self.native_query(query_str) 

212 

213 def get_tables(self) -> Response: 

214 """ Return list of entities 

215 Return list of entities that will be accesible as tables. 

216 Returns: 

217 HandlerResponse 

218 """ 

219 query = ''' 

220 SELECT 

221 TABLE_NAME, 

222 TABLE_ID, 

223 TABLE_TYPE 

224 FROM 

225 system_.sys_tables_ 

226 WHERE 

227 user_id = USER_ID(); 

228 ''' 

229 

230 return self.native_query(query) 

231 

232 def get_columns(self, table_name: str) -> Response: 

233 """ Returns a list of entity columns 

234 Args: 

235 table_name (str): name of one of tables returned by self.get_tables() 

236 Returns: 

237 HandlerResponse 

238 """ 

239 query = f""" 

240 SELECT 

241 COLUMN_NAME, 

242 DATA_TYPE 

243 FROM 

244 system_.sys_columns_ ct 

245 inner join 

246 system_.sys_tables_ tt 

247 on ct.table_id=tt.table_id 

248 where 

249 tt.table_name = '{table_name.capitalize()}'; 

250 """ 

251 

252 return self.native_query(query)