Coverage for mindsdb / integrations / handlers / hsqldb_handler / hsqldb_handler.py: 0%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2import pyodbc 

3 

4from mindsdb_sql_parser.ast.base import ASTNode 

5from mindsdb.integrations.libs.base import DatabaseHandler 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb.utilities import log 

8from mindsdb.integrations.libs.response import ( 

9 HandlerStatusResponse as StatusResponse, 

10 HandlerResponse as Response, 

11 RESPONSE_TYPE 

12) 

13from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

14 

15logger = log.getLogger(__name__) 

16 

17 

18class HSQLDBHandler(DatabaseHandler): 

19 """ 

20 This handler handles connection and execution of the HyperSQL statements. 

21 """ 

22 

23 name = 'hsqldb' 

24 

25 def __init__(self, name: str, **kwargs): 

26 """ 

27 Initialize the handler. 

28 Args: 

29 name (str): name of particular handler instance 

30 connection_data (dict): parameters for connecting to the database 

31 **kwargs: arbitrary keyword arguments. 

32 """ 

33 super().__init__(name) 

34 self.parser = parse_sql 

35 self.dialect = 'hsqldb' 

36 self.connection_args = kwargs.get('connection_data') 

37 self.server_name = self.connection_args.get('server_name', 'localhost') 

38 self.port = self.connection_args.get('port') 

39 self.database_name = self.connection_args.get('database_name') 

40 self.username = self.connection_args.get('username') 

41 self.password = self.connection_args.get('password') 

42 self.conn_str = f"DRIVER={{PostgreSQL Unicode}};SERVER={self.server_name};PORT={self.port};DATABASE={self.database_name};UID={self.username};PWD={self.password};Trusted_Connection=True" 

43 self.connection = None 

44 self.is_connected = False 

45 

46 def __del__(self): 

47 if self.is_connected is True: 

48 self.disconnect() 

49 

50 def connect(self) -> StatusResponse: 

51 """ 

52 Set up the connection required by the handler. 

53 Returns: 

54 HandlerStatusResponse 

55 """ 

56 

57 if self.is_connected is True: 

58 return self.connection 

59 

60 self.connection = pyodbc.connect(self.conn_str, timeout=10) 

61 self.is_connected = True 

62 

63 return self.connection 

64 

65 def disconnect(self): 

66 """ 

67 Close any existing connections. 

68 """ 

69 

70 if self.is_connected is False: 

71 return 

72 

73 self.connection.close() 

74 self.is_connected = False 

75 return self.is_connected 

76 

77 def check_connection(self) -> StatusResponse: 

78 """ 

79 Check connection to the handler. 

80 Returns: 

81 HandlerStatusResponse 

82 """ 

83 

84 response = StatusResponse(False) 

85 need_to_close = self.is_connected is False 

86 

87 try: 

88 self.connect() 

89 response.success = True 

90 except Exception as e: 

91 logger.error(f'Error connecting to SQLite, {e}!') 

92 response.error_message = str(e) 

93 finally: 

94 if response.success is True and need_to_close: 

95 self.disconnect() 

96 if response.success is False and self.is_connected is True: 

97 self.is_connected = False 

98 

99 return response 

100 

101 def native_query(self, query: str) -> StatusResponse: 

102 """ 

103 Receive raw query and act upon it somehow. 

104 Args: 

105 query (str): query in native format 

106 Returns: 

107 HandlerResponse 

108 """ 

109 

110 need_to_close = self.is_connected is False 

111 

112 connection = self.connect() 

113 with connection.cursor() as cursor: 

114 try: 

115 cursor.execute(query) 

116 result = cursor.fetchall() 

117 if result: 

118 response = Response( 

119 RESPONSE_TYPE.TABLE, 

120 data_frame=pd.DataFrame.from_records( 

121 result, 

122 columns=[x[0] for x in cursor.description] 

123 ) 

124 ) 

125 

126 else: 

127 response = Response(RESPONSE_TYPE.OK) 

128 connection.commit() 

129 except Exception as e: 

130 logger.error(f'Error running query: {query}!') 

131 response = Response( 

132 RESPONSE_TYPE.ERROR, 

133 error_message=str(e) 

134 ) 

135 

136 if need_to_close is True: 

137 self.disconnect() 

138 

139 return response 

140 

141 def query(self, query: ASTNode) -> StatusResponse: 

142 """ 

143 Receive query as AST (abstract syntax tree) and act upon it somehow. 

144 Args: 

145 query (ASTNode): sql query represented as AST. May be any kind 

146 of query: SELECT, INTSERT, DELETE, etc 

147 Returns: 

148 HandlerResponse 

149 """ 

150 

151 renderer = SqlalchemyRender('postgres') 

152 query_str = renderer.get_string(query, with_failback=True) 

153 return self.native_query(query_str) 

154 

155 def get_tables(self) -> StatusResponse: 

156 """ 

157 Return list of entities that will be accessible as tables. 

158 Returns: 

159 HandlerResponse 

160 """ 

161 

162 connection = self.connect() 

163 cursor = connection.cursor() 

164 cursor.execute("SELECT * FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'pg_catalog') AND table_type='BASE TABLE'") 

165 results = cursor.fetchall() 

166 df = pd.DataFrame([x[2] for x in results], columns=['table_name']) # Workaround since cursor.tables() wont work with postgres driver 

167 response = Response( 

168 RESPONSE_TYPE.TABLE, 

169 df 

170 ) 

171 

172 return response 

173 

174 def get_columns(self, table_name: str) -> StatusResponse: 

175 """ 

176 Returns a list of entity columns. 

177 Args: 

178 table_name (str): name of one of tables returned by self.get_tables() 

179 Returns: 

180 HandlerResponse 

181 """ 

182 

183 connection = self.connect() 

184 cursor = connection.cursor() 

185 query = f'SELECT * FROM information_schema.columns WHERE table_name ={table_name}' # Workaround since cursor.columns() wont work with postgres driver 

186 cursor.execute(query) 

187 results = cursor.fetchall() 

188 df = pd.DataFrame( 

189 [(x[3], x[7]) for x in results], 

190 columns=['column_name', 'data_type'] 

191 ) 

192 

193 response = Response( 

194 RESPONSE_TYPE.TABLE, 

195 df 

196 ) 

197 

198 return response