Coverage for mindsdb / integrations / handlers / altibase_handler / altibase_handler.py: 0%
121 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
3import jaydebeapi as jdbcconnector
4from mindsdb_sql_parser import parse_sql
5from mindsdb_sql_parser.ast.base import ASTNode
6import pandas as pd
7import pyodbc
8import numpy as np
10from mindsdb.integrations.libs.base import DatabaseHandler
11from mindsdb.integrations.libs.response import (
12 HandlerStatusResponse as StatusResponse,
13 HandlerResponse as Response,
14 RESPONSE_TYPE
15)
16from mindsdb.utilities import log
18logger = log.getLogger(__name__)
21class AltibaseHandler(DatabaseHandler):
22 """
23 This handler handles connection and execution of the Altibase statements.
24 """
26 name = 'altibase'
28 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
29 """ constructor
30 Args:
31 name (str): name of particular handler instance
32 connection_data (dict): parameters for connecting to the database
33 """
34 super().__init__(name)
36 self.parser = parse_sql
38 self.connection_args = connection_data
39 self.database = self.connection_args.get('database')
40 self.host = self.connection_args.get('host')
41 self.port = self.connection_args.get('port')
42 self.user = self.connection_args.get('user')
43 self.password = self.connection_args.get('password')
44 self.dsn = self.connection_args.get('dsn')
46 self.connection = None
47 self.is_connected = False
49 def connect(self):
50 """ Set up any connections required by the handler
51 Should return output of check_connection() method after attempting connection.
52 Should switch self.is_connected.
53 Returns:
54 connection
55 """
56 if self.is_connected is True:
57 return self.connection
59 if self.dsn:
60 return self.connect_with_odbc()
61 else:
62 return self.connect_with_jdbc()
64 def connect_with_odbc(self):
65 """ Set up any connections required by the handler
66 Should return output of check_connection() method after attempting connection.
67 Should switch self.is_connected.
68 Returns:
69 connection
70 """
71 conn_str = [f"DSN={self.dsn}"]
73 if self.host:
74 conn_str.append(f"Server={self.host}")
75 if self.port:
76 conn_str.append(f"Port={self.port}")
77 if self.user:
78 conn_str.append(f"User={self.user}")
79 if self.password:
80 conn_str.append(f"Password={self.password}")
82 conn_str = ';'.join(conn_str)
84 try:
85 self.connection = pyodbc.connect(conn_str, timeout=10)
86 self.is_connected = True
87 except Exception as e:
88 logger.error(f"Error while connecting to {self.database}, {e}")
90 return self.connection
92 def connect_with_jdbc(self):
93 """ Set up any connections required by the handler
94 Should return output of check_connection() method after attempting connection.
95 Should switch self.is_connected.
96 Returns:
97 connection
98 """
99 jar_location = self.connection_args.get('jar_location')
101 jdbc_class = self.connection_args.get('jdbc_class', 'Altibase.jdbc.driver.AltibaseDriver')
102 jdbc_url = f"jdbc:Altibase://{self.host}:{self.port}/{self.database}"
104 try:
105 if self.user and self.password and jar_location:
106 connection = jdbcconnector.connect(jclassname=jdbc_class, url=jdbc_url, driver_args=[self.user, self.password], jars=str(jar_location).split(","))
107 elif self.user and self.password:
108 connection = jdbcconnector.connect(jclassname=jdbc_class, url=jdbc_url, driver_args=[self.user, self.password])
109 elif jar_location:
110 connection = jdbcconnector.connect(jclassname=jdbc_class, url=jdbc_url, jars=jar_location.split(","))
111 else:
112 connection = jdbcconnector.connect(jclassname=jdbc_class, url=jdbc_url)
114 self.connection = connection
115 self.is_connected = True
116 except Exception as e:
117 logger.error(f"Error while connecting to {self.database}, {e}")
118 raise e
120 return self.connection
122 def disconnect(self):
123 """ Close any existing connections
124 Should switch self.is_connected.
125 """
126 if self.is_connected is True:
127 try:
128 self.connection.close()
129 self.is_connected = False
130 except Exception as e:
131 logger.error(f"Error while disconnecting to {self.database}, {e}")
132 return False
133 return True
135 def check_connection(self) -> StatusResponse:
136 """ Check connection to the handler
137 Returns:
138 HandlerStatusResponse
139 """
140 responseCode = StatusResponse(success=False)
141 need_to_close = self.is_connected is False
143 try:
144 self.connect()
145 responseCode.success = True
146 except Exception as e:
147 logger.error(f'Error connecting to database {self.database}, {e}!')
148 responseCode.error_message = str(e)
149 finally:
150 if responseCode.success and need_to_close:
151 self.disconnect()
152 if not responseCode.success and self.is_connected:
153 self.is_connected = False
155 return responseCode
157 def native_query(self, query: str) -> Response:
158 """Receive raw query and act upon it somehow.
159 Args:
160 query (str): query in native format
161 Returns:
162 HandlerResponse
163 """
164 need_to_close = self.is_connected is False
165 connection = self.connect()
166 with connection.cursor() as cur:
167 try:
168 cur.execute(query)
169 if cur.description:
170 result = cur.fetchall()
172 if self.dsn:
173 if len(result) > 0:
174 result = np.array(result)
176 response = Response(
177 RESPONSE_TYPE.TABLE,
178 data_frame=pd.DataFrame(
179 result,
180 columns=[x[0] for x in cur.description]
181 )
182 )
183 else:
184 response = Response(RESPONSE_TYPE.OK)
185 connection.commit()
186 except Exception as e:
187 logger.error(f'Error running query: {query} on {self.database}!')
188 response = Response(
189 RESPONSE_TYPE.ERROR,
190 error_message=str(e)
191 )
192 connection.rollback()
194 if need_to_close is True:
195 self.disconnect()
197 return response
199 def query(self, query: ASTNode) -> Response:
200 """Receive query as AST (abstract syntax tree) and act upon it somehow.
201 Args:
202 query (ASTNode): sql query represented as AST. May be any kind of query: SELECT, INSERT, DELETE, etc
203 Returns:
204 HandlerResponse
205 """
206 if isinstance(query, ASTNode):
207 query_str = query.to_string()
208 else:
209 query_str = str(query)
211 return self.native_query(query_str)
213 def get_tables(self) -> Response:
214 """ Return list of entities
215 Return list of entities that will be accesible as tables.
216 Returns:
217 HandlerResponse
218 """
219 query = '''
220 SELECT
221 TABLE_NAME,
222 TABLE_ID,
223 TABLE_TYPE
224 FROM
225 system_.sys_tables_
226 WHERE
227 user_id = USER_ID();
228 '''
230 return self.native_query(query)
232 def get_columns(self, table_name: str) -> Response:
233 """ Returns a list of entity columns
234 Args:
235 table_name (str): name of one of tables returned by self.get_tables()
236 Returns:
237 HandlerResponse
238 """
239 query = f"""
240 SELECT
241 COLUMN_NAME,
242 DATA_TYPE
243 FROM
244 system_.sys_columns_ ct
245 inner join
246 system_.sys_tables_ tt
247 on ct.table_id=tt.table_id
248 where
249 tt.table_name = '{table_name.capitalize()}';
250 """
252 return self.native_query(query)