Coverage for mindsdb / integrations / handlers / nuo_jdbc_handler / nuo_jdbc_handler.py: 0%
112 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
2from mindsdb_sql_parser.ast.base import ASTNode
3from mindsdb.integrations.libs.base import DatabaseHandler
4from mindsdb.utilities import log
5from mindsdb_sql_parser import parse_sql
6from mindsdb.integrations.libs.response import (
7 HandlerStatusResponse as StatusResponse,
8 HandlerResponse as Response,
9 RESPONSE_TYPE
10)
11import pandas as pd
12import jaydebeapi as jdbcconnector
14logger = log.getLogger(__name__)
17class NuoHandler(DatabaseHandler):
19 name = 'nuo_jdbc'
21 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
22 """ Initialize the handler
23 Args:
24 name (str): name of particular handler instance
25 connection_data (dict): parameters for connecting to the database
26 **kwargs: arbitrary keyword arguments.
27 """
28 super().__init__(name)
30 self.kwargs = kwargs
31 self.parser = parse_sql
32 self.database = connection_data['database']
33 self.connection_config = connection_data
34 self.host = connection_data['host']
35 self.port = connection_data['port']
36 self.user = connection_data['user']
37 self.is_direct = connection_data['is_direct']
38 self.password = connection_data['password']
39 self.connection = None
40 self.is_connected = False
41 self.schema = None
43 self.jdbc_url = self.construct_jdbc_url()
45 def connect(self):
46 """ Set up any connections required by the handler
47 Should return output of check_connection() method after attempting
48 connection. Should switch self.is_connected.
49 Returns:
50 Connection Object
51 """
52 if self.is_connected is True:
53 return self.connection
55 jdbc_class = "com.nuodb.jdbc.Driver"
56 jar_location = self.connection_config.get('jar_location')
58 try:
59 if (jar_location):
60 self.connection = jdbcconnector.connect(jclassname=jdbc_class, url=self.jdbc_url, jars=jar_location)
61 else:
62 self.connection = jdbcconnector.connect(jclassname=jdbc_class, url=self.jdbc_url)
63 except Exception as e:
64 logger.error(f"Error while connecting to {self.database}, {e}")
66 return self.connection
68 def construct_jdbc_url(self):
69 """ Constructs the JDBC url based on the paramters provided to the handler class.\
70 Returns:
71 The JDBC connection url string.
72 """
74 jdbc_url = "jdbc:com.nuodb://" + self.host
76 # port is an optional paramter, if found then append
77 port = self.connection_config.get('port')
78 if port:
79 jdbc_url = jdbc_url + ":" + str(port)
81 jdbc_url = jdbc_url + "/" + self.database + "?user=" + self.user + "&password=" + self.password
83 # check if a schema is provided in the connection args, if provided use the schema to establish connection
84 schema = self.connection_config.get('schema')
85 if schema:
86 self.schema = schema
87 jdbc_url = jdbc_url + "&schema=" + schema
89 # sets direct paramter only if the paramters is specified to be true
90 if (str(self.is_direct).lower() == 'true'):
91 jdbc_url = jdbc_url + "&direct=true"
93 driver_args = self.connection_config.get('driver_args')
95 # if driver args are present then construct them in the form: &query=one#qquerytwo=true
96 # finally append these to the url
97 if (driver_args):
98 driver_arg_string = '&'.join(driver_args.split(","))
99 jdbc_url = jdbc_url + "&" + driver_arg_string
101 return jdbc_url
103 def disconnect(self):
104 """ Close any existing connections
105 Should switch self.is_connected.
106 """
107 if self.is_connected is False:
108 return
109 try:
110 self.connection.close()
111 self.is_connected = False
112 except Exception as e:
113 logger.error(f"Error while disconnecting to {self.database}, {e}")
115 return
117 def check_connection(self) -> StatusResponse:
118 """ Check connection to the handler
119 Returns:
120 HandlerStatusResponse
121 """
122 responseCode = StatusResponse(False)
123 need_to_close = self.is_connected is False
125 try:
126 self.connect()
127 responseCode.success = True
128 except Exception as e:
129 logger.error(f'Error connecting to database {self.database}, {e}!')
130 responseCode.error_message = str(e)
131 finally:
132 if responseCode.success is True and need_to_close:
133 self.disconnect()
134 if responseCode.success is False and self.is_connected is True:
135 self.is_connected = False
137 return responseCode
139 def native_query(self, query: str) -> StatusResponse:
140 """Receive raw query and act upon it somehow.
141 Args:
142 query (Any): query in native format (str for sql databases,
143 dict for mongo, etc)
144 Returns:
145 HandlerResponse
146 """
147 need_to_close = self.is_connected is False
148 conn = self.connect()
149 with conn.cursor() as cur:
150 try:
151 cur.execute(query)
152 if cur.description:
153 result = cur.fetchall()
154 response = Response(
155 RESPONSE_TYPE.TABLE,
156 data_frame=pd.DataFrame(
157 result,
158 columns=[x[0] for x in cur.description]
159 )
160 )
161 else:
162 response = Response(RESPONSE_TYPE.OK)
163 self.connection.commit()
164 except Exception as e:
165 logger.error(f'Error running query: {query} on {self.database}!')
166 response = Response(
167 RESPONSE_TYPE.ERROR,
168 error_message=str(e)
169 )
170 self.connection.rollback()
172 if need_to_close is True:
173 self.disconnect()
175 return response
177 def query(self, query: ASTNode) -> StatusResponse:
178 """Render and execute a SQL query.
180 Args:
181 query (ASTNode): The SQL query.
183 Returns:
184 Response: The query result.
185 """
186 if isinstance(query, ASTNode):
187 query_str = query.to_string()
188 else:
189 query_str = str(query)
191 return self.native_query(query_str)
193 def get_tables(self) -> StatusResponse:
194 """Get a list of all the tables in the database.
196 Returns:
197 Response: Names of the tables in the database.
198 """
199 if self.schema:
200 query = f''' SELECT TABLENAME FROM SYSTEM.TABLES WHERE SCHEMA = '{self.schema}' '''
201 else:
202 query = ''' SELECT TABLENAME FROM SYSTEM.TABLES WHERE SCHEMA != 'SYSTEM' '''
204 result = self.native_query(query)
205 df = result.data_frame
206 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'})
207 return result
209 def get_columns(self, table_name: str) -> StatusResponse:
210 """Get details about a table.
212 Args:
213 table_name (str): Name of the table to retrieve details of.
215 Returns:
216 Response: Details of the table.
217 """
219 query = f''' SELECT FIELD FROM SYSTEM.FIELDS WHERE TABLENAME='{table_name}' '''
220 return self.native_query(query)