Coverage for mindsdb / integrations / handlers / informix_handler / informix_handler.py: 0%

111 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from collections import OrderedDict 

2from typing import Optional 

3from mindsdb_sql_parser.ast.base import ASTNode 

4from mindsdb.integrations.libs.base import DatabaseHandler 

5from mindsdb.utilities import log 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

8from mindsdb.integrations.libs.response import ( 

9 HandlerStatusResponse as StatusResponse, 

10 HandlerResponse as Response, 

11 RESPONSE_TYPE, 

12) 

13from mindsdb.integrations.libs.const import HANDLER_CONNECTION_ARG_TYPE as ARG_TYPE 

14 

15 

16import pandas as pd 

17import IfxPyDbi as Ifx 

18from sqlalchemy_informix.ibmdb import InformixDialect 

19 

20logger = log.getLogger(__name__) 

21 

22 

23class InformixHandler(DatabaseHandler): 

24 name = "informix" 

25 

26 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

27 """Initialize the handler 

28 Args: 

29 name (str): name of particular handler instance 

30 connection_data (dict): parameters for connecting to the database 

31 **kwargs: arbitrary keyword arguments. 

32 """ 

33 super().__init__(name) 

34 

35 self.kwargs = kwargs 

36 self.parser = parse_sql 

37 self.loging_enabled = connection_data["loging_enabled"] if "loging_enabled" in connection_data else True 

38 self.server = connection_data["server"] 

39 self.database = connection_data["database"] 

40 self.user = connection_data["user"] 

41 self.password = connection_data["password"] 

42 self.schemaName = connection_data["schema_name"] 

43 self.host = connection_data["host"] 

44 self.port = connection_data["port"] 

45 self.connString = ("SERVER={0};DATABASE={1};HOST={2};PORT={3};UID={4};PWD={5};").format( 

46 self.server, self.database, self.host, self.port, self.user, self.password 

47 ) 

48 

49 self.connection = None 

50 self.is_connected = False 

51 

52 def connect(self): 

53 """Set up any connections required by the handler 

54 Should return output of check_connection() method after attempting 

55 connection. Should switch self.is_connected. 

56 Returns: 

57 Connection Object 

58 """ 

59 if self.is_connected is True: 

60 return self.connection 

61 

62 try: 

63 self.connection = Ifx.connect(self.connString, "", "") 

64 

65 self.is_connected = True 

66 except Exception as e: 

67 logger.error(f"Error while connecting to {self.database}, {e}") 

68 

69 return self.connection 

70 

71 def disconnect(self): 

72 """Close any existing connections 

73 Should switch self.is_connected. 

74 """ 

75 if self.is_connected is False: 

76 return 

77 try: 

78 self.connection.close() 

79 self.is_connected = False 

80 except Exception as e: 

81 logger.error(f"Error while disconnecting to {self.database}, {e}") 

82 

83 return 

84 

85 def check_connection(self) -> StatusResponse: 

86 """Check connection to the handler 

87 Returns: 

88 HandlerStatusResponse 

89 """ 

90 responseCode = StatusResponse(False) 

91 need_to_close = self.is_connected is False 

92 

93 try: 

94 self.connect() 

95 responseCode.success = True 

96 except Exception as e: 

97 logger.error(f"Error connecting to database {self.database}, {e}!") 

98 responseCode.error_message = str(e) 

99 finally: 

100 if responseCode.success is True and need_to_close: 

101 self.disconnect() 

102 if responseCode.success is False and self.is_connected is True: 

103 self.is_connected = False 

104 

105 return responseCode 

106 

107 def native_query(self, query: str) -> StatusResponse: 

108 """Receive raw query and act upon it somehow. 

109 Args: 

110 query (Any): query in native format (str for sql databases, 

111 etc) 

112 Returns: 

113 HandlerResponse 

114 """ 

115 need_to_close = self.is_connected is False 

116 conn = self.connect() 

117 cur = conn.cursor() 

118 try: 

119 cur.execute(query) 

120 

121 if cur._result_set_produced: 

122 result = cur.fetchall() 

123 response = Response( 

124 RESPONSE_TYPE.TABLE, 

125 data_frame=pd.DataFrame(result, columns=[x[0] for x in cur.description]), 

126 ) 

127 else: 

128 response = Response(RESPONSE_TYPE.OK) 

129 if self.loging_enabled: 

130 self.connection.commit() 

131 except Exception as e: 

132 logger.error(f"Error running query: {query} on {self.database}") 

133 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e)) 

134 if self.loging_enabled: 

135 self.connection.rollback() 

136 

137 cur.close() 

138 

139 if need_to_close is True: 

140 self.disconnect() 

141 

142 return response 

143 

144 def query(self, query: ASTNode) -> StatusResponse: 

145 """Receive query as AST (abstract syntax tree) and act upon it somehow. 

146 Args: 

147 query (ASTNode): sql query represented as AST. May be any kind 

148 of query: SELECT, INTSERT, DELETE, etc 

149 Returns: 

150 HandlerResponse 

151 """ 

152 

153 renderer = SqlalchemyRender(InformixDialect) 

154 query_str = renderer.get_string(query, with_failback=True) 

155 return self.native_query(query_str) 

156 

157 def get_tables(self) -> StatusResponse: 

158 """Return list of entities 

159 Return list of entities that will be accesible as tables. 

160 Returns: 

161 HandlerResponse: shoud have same columns as information_schema.tables 

162 (https://dev.mysql.com/doc/refman/8.0/en/information-schema-tables-table.html) 

163 Column 'TABLE_NAME' is mandatory, other is optional. 

164 """ 

165 self.connect() 

166 

167 result = self.connection.tables() 

168 try: 

169 if result: 

170 response = Response( 

171 RESPONSE_TYPE.TABLE, 

172 data_frame=pd.DataFrame( 

173 [x["TABLE_NAME"] for x in result if x["TABLE_SCHEM"] == self.schemaName], 

174 columns=["TABLE_NAME"], 

175 ), 

176 ) 

177 else: 

178 response = Response(RESPONSE_TYPE.OK) 

179 

180 except Exception as e: 

181 logger.error(f"Error running while getting table {e} on ") 

182 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e)) 

183 

184 return response 

185 

186 def get_columns(self, table_name: str) -> StatusResponse: 

187 """Returns a list of entity columns 

188 Args: 

189 table_name (str): name of one of tables returned by self.get_tables() 

190 Returns: 

191 HandlerResponse: shoud have same columns as information_schema.columns 

192 (https://dev.mysql.com/doc/refman/8.0/en/information-schema-columns-table.html) 

193 Column 'COLUMN_NAME' is mandatory, other is optional. Hightly 

194 recomended to define also 'DATA_TYPE': it should be one of 

195 python data types (by default it str). 

196 """ 

197 

198 self.connect() 

199 

200 result = self.connection.columns(table_name=table_name) 

201 try: 

202 if result: 

203 response = Response( 

204 RESPONSE_TYPE.TABLE, 

205 data_frame=pd.DataFrame( 

206 [result[i]["COLUMN_NAME"] for i in range(len(result))], 

207 columns=["COLUMN_NAME"], 

208 ), 

209 ) 

210 else: 

211 response = Response(RESPONSE_TYPE.OK) 

212 

213 except Exception as e: 

214 logger.error(f"Error running while getting table {e} on ") 

215 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e)) 

216 

217 return response 

218 

219 

220connection_args = OrderedDict( 

221 server={ 

222 "type": ARG_TYPE.STR, 

223 "description": """ 

224 The server name you want to get connected. 

225 """, 

226 }, 

227 database={ 

228 "type": ARG_TYPE.STR, 

229 "description": """ 

230 The database name to use when connecting with the DB2 server. 

231 """, 

232 }, 

233 user={ 

234 "type": ARG_TYPE.STR, 

235 "description": "The user name used to authenticate with the DB2 server.", 

236 }, 

237 password={ 

238 "type": ARG_TYPE.STR, 

239 "description": "The password to authenticate the user with the DB2 server.", 

240 }, 

241 host={ 

242 "type": ARG_TYPE.STR, 

243 "description": "The host name or IP address of the DB2 server/database.", 

244 }, 

245 port={ 

246 "type": ARG_TYPE.INT, 

247 "description": "Specify port to connect DB2 through TCP/IP", 

248 }, 

249 schema_name={ 

250 "type": ARG_TYPE.STR, 

251 "description": "Specify the schema name for showing tables ", 

252 }, 

253 logging_enabled={ 

254 "type": ARG_TYPE.BOOL, 

255 "description": """ 

256 Used for COMMIT and ROLLBACK as this command works only for logging enabled database. 

257 Note: Its optional. 

258 Default is TRUE 

259 """, 

260 }, 

261) 

262 

263connection_args_example = OrderedDict( 

264 server="server", 

265 database="stores_demo", 

266 user="informix", 

267 password="in4mix", 

268 host="127.0.0.1", 

269 port="9091", 

270 schema_name="Love", 

271)