Coverage for mindsdb / integrations / handlers / hsqldb_handler / hsqldb_handler.py: 0%
91 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pandas as pd
2import pyodbc
4from mindsdb_sql_parser.ast.base import ASTNode
5from mindsdb.integrations.libs.base import DatabaseHandler
6from mindsdb_sql_parser import parse_sql
7from mindsdb.utilities import log
8from mindsdb.integrations.libs.response import (
9 HandlerStatusResponse as StatusResponse,
10 HandlerResponse as Response,
11 RESPONSE_TYPE
12)
13from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
15logger = log.getLogger(__name__)
18class HSQLDBHandler(DatabaseHandler):
19 """
20 This handler handles connection and execution of the HyperSQL statements.
21 """
23 name = 'hsqldb'
25 def __init__(self, name: str, **kwargs):
26 """
27 Initialize the handler.
28 Args:
29 name (str): name of particular handler instance
30 connection_data (dict): parameters for connecting to the database
31 **kwargs: arbitrary keyword arguments.
32 """
33 super().__init__(name)
34 self.parser = parse_sql
35 self.dialect = 'hsqldb'
36 self.connection_args = kwargs.get('connection_data')
37 self.server_name = self.connection_args.get('server_name', 'localhost')
38 self.port = self.connection_args.get('port')
39 self.database_name = self.connection_args.get('database_name')
40 self.username = self.connection_args.get('username')
41 self.password = self.connection_args.get('password')
42 self.conn_str = f"DRIVER={{PostgreSQL Unicode}};SERVER={self.server_name};PORT={self.port};DATABASE={self.database_name};UID={self.username};PWD={self.password};Trusted_Connection=True"
43 self.connection = None
44 self.is_connected = False
46 def __del__(self):
47 if self.is_connected is True:
48 self.disconnect()
50 def connect(self) -> StatusResponse:
51 """
52 Set up the connection required by the handler.
53 Returns:
54 HandlerStatusResponse
55 """
57 if self.is_connected is True:
58 return self.connection
60 self.connection = pyodbc.connect(self.conn_str, timeout=10)
61 self.is_connected = True
63 return self.connection
65 def disconnect(self):
66 """
67 Close any existing connections.
68 """
70 if self.is_connected is False:
71 return
73 self.connection.close()
74 self.is_connected = False
75 return self.is_connected
77 def check_connection(self) -> StatusResponse:
78 """
79 Check connection to the handler.
80 Returns:
81 HandlerStatusResponse
82 """
84 response = StatusResponse(False)
85 need_to_close = self.is_connected is False
87 try:
88 self.connect()
89 response.success = True
90 except Exception as e:
91 logger.error(f'Error connecting to SQLite, {e}!')
92 response.error_message = str(e)
93 finally:
94 if response.success is True and need_to_close:
95 self.disconnect()
96 if response.success is False and self.is_connected is True:
97 self.is_connected = False
99 return response
101 def native_query(self, query: str) -> StatusResponse:
102 """
103 Receive raw query and act upon it somehow.
104 Args:
105 query (str): query in native format
106 Returns:
107 HandlerResponse
108 """
110 need_to_close = self.is_connected is False
112 connection = self.connect()
113 with connection.cursor() as cursor:
114 try:
115 cursor.execute(query)
116 result = cursor.fetchall()
117 if result:
118 response = Response(
119 RESPONSE_TYPE.TABLE,
120 data_frame=pd.DataFrame.from_records(
121 result,
122 columns=[x[0] for x in cursor.description]
123 )
124 )
126 else:
127 response = Response(RESPONSE_TYPE.OK)
128 connection.commit()
129 except Exception as e:
130 logger.error(f'Error running query: {query}!')
131 response = Response(
132 RESPONSE_TYPE.ERROR,
133 error_message=str(e)
134 )
136 if need_to_close is True:
137 self.disconnect()
139 return response
141 def query(self, query: ASTNode) -> StatusResponse:
142 """
143 Receive query as AST (abstract syntax tree) and act upon it somehow.
144 Args:
145 query (ASTNode): sql query represented as AST. May be any kind
146 of query: SELECT, INTSERT, DELETE, etc
147 Returns:
148 HandlerResponse
149 """
151 renderer = SqlalchemyRender('postgres')
152 query_str = renderer.get_string(query, with_failback=True)
153 return self.native_query(query_str)
155 def get_tables(self) -> StatusResponse:
156 """
157 Return list of entities that will be accessible as tables.
158 Returns:
159 HandlerResponse
160 """
162 connection = self.connect()
163 cursor = connection.cursor()
164 cursor.execute("SELECT * FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'pg_catalog') AND table_type='BASE TABLE'")
165 results = cursor.fetchall()
166 df = pd.DataFrame([x[2] for x in results], columns=['table_name']) # Workaround since cursor.tables() wont work with postgres driver
167 response = Response(
168 RESPONSE_TYPE.TABLE,
169 df
170 )
172 return response
174 def get_columns(self, table_name: str) -> StatusResponse:
175 """
176 Returns a list of entity columns.
177 Args:
178 table_name (str): name of one of tables returned by self.get_tables()
179 Returns:
180 HandlerResponse
181 """
183 connection = self.connect()
184 cursor = connection.cursor()
185 query = f'SELECT * FROM information_schema.columns WHERE table_name ={table_name}' # Workaround since cursor.columns() wont work with postgres driver
186 cursor.execute(query)
187 results = cursor.fetchall()
188 df = pd.DataFrame(
189 [(x[3], x[7]) for x in results],
190 columns=['column_name', 'data_type']
191 )
193 response = Response(
194 RESPONSE_TYPE.TABLE,
195 df
196 )
198 return response