Coverage for mindsdb / integrations / handlers / informix_handler / informix_handler.py: 0%
111 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from collections import OrderedDict
2from typing import Optional
3from mindsdb_sql_parser.ast.base import ASTNode
4from mindsdb.integrations.libs.base import DatabaseHandler
5from mindsdb.utilities import log
6from mindsdb_sql_parser import parse_sql
7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
8from mindsdb.integrations.libs.response import (
9 HandlerStatusResponse as StatusResponse,
10 HandlerResponse as Response,
11 RESPONSE_TYPE,
12)
13from mindsdb.integrations.libs.const import HANDLER_CONNECTION_ARG_TYPE as ARG_TYPE
16import pandas as pd
17import IfxPyDbi as Ifx
18from sqlalchemy_informix.ibmdb import InformixDialect
20logger = log.getLogger(__name__)
23class InformixHandler(DatabaseHandler):
24 name = "informix"
26 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
27 """Initialize the handler
28 Args:
29 name (str): name of particular handler instance
30 connection_data (dict): parameters for connecting to the database
31 **kwargs: arbitrary keyword arguments.
32 """
33 super().__init__(name)
35 self.kwargs = kwargs
36 self.parser = parse_sql
37 self.loging_enabled = connection_data["loging_enabled"] if "loging_enabled" in connection_data else True
38 self.server = connection_data["server"]
39 self.database = connection_data["database"]
40 self.user = connection_data["user"]
41 self.password = connection_data["password"]
42 self.schemaName = connection_data["schema_name"]
43 self.host = connection_data["host"]
44 self.port = connection_data["port"]
45 self.connString = ("SERVER={0};DATABASE={1};HOST={2};PORT={3};UID={4};PWD={5};").format(
46 self.server, self.database, self.host, self.port, self.user, self.password
47 )
49 self.connection = None
50 self.is_connected = False
52 def connect(self):
53 """Set up any connections required by the handler
54 Should return output of check_connection() method after attempting
55 connection. Should switch self.is_connected.
56 Returns:
57 Connection Object
58 """
59 if self.is_connected is True:
60 return self.connection
62 try:
63 self.connection = Ifx.connect(self.connString, "", "")
65 self.is_connected = True
66 except Exception as e:
67 logger.error(f"Error while connecting to {self.database}, {e}")
69 return self.connection
71 def disconnect(self):
72 """Close any existing connections
73 Should switch self.is_connected.
74 """
75 if self.is_connected is False:
76 return
77 try:
78 self.connection.close()
79 self.is_connected = False
80 except Exception as e:
81 logger.error(f"Error while disconnecting to {self.database}, {e}")
83 return
85 def check_connection(self) -> StatusResponse:
86 """Check connection to the handler
87 Returns:
88 HandlerStatusResponse
89 """
90 responseCode = StatusResponse(False)
91 need_to_close = self.is_connected is False
93 try:
94 self.connect()
95 responseCode.success = True
96 except Exception as e:
97 logger.error(f"Error connecting to database {self.database}, {e}!")
98 responseCode.error_message = str(e)
99 finally:
100 if responseCode.success is True and need_to_close:
101 self.disconnect()
102 if responseCode.success is False and self.is_connected is True:
103 self.is_connected = False
105 return responseCode
107 def native_query(self, query: str) -> StatusResponse:
108 """Receive raw query and act upon it somehow.
109 Args:
110 query (Any): query in native format (str for sql databases,
111 etc)
112 Returns:
113 HandlerResponse
114 """
115 need_to_close = self.is_connected is False
116 conn = self.connect()
117 cur = conn.cursor()
118 try:
119 cur.execute(query)
121 if cur._result_set_produced:
122 result = cur.fetchall()
123 response = Response(
124 RESPONSE_TYPE.TABLE,
125 data_frame=pd.DataFrame(result, columns=[x[0] for x in cur.description]),
126 )
127 else:
128 response = Response(RESPONSE_TYPE.OK)
129 if self.loging_enabled:
130 self.connection.commit()
131 except Exception as e:
132 logger.error(f"Error running query: {query} on {self.database}")
133 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e))
134 if self.loging_enabled:
135 self.connection.rollback()
137 cur.close()
139 if need_to_close is True:
140 self.disconnect()
142 return response
144 def query(self, query: ASTNode) -> StatusResponse:
145 """Receive query as AST (abstract syntax tree) and act upon it somehow.
146 Args:
147 query (ASTNode): sql query represented as AST. May be any kind
148 of query: SELECT, INTSERT, DELETE, etc
149 Returns:
150 HandlerResponse
151 """
153 renderer = SqlalchemyRender(InformixDialect)
154 query_str = renderer.get_string(query, with_failback=True)
155 return self.native_query(query_str)
157 def get_tables(self) -> StatusResponse:
158 """Return list of entities
159 Return list of entities that will be accesible as tables.
160 Returns:
161 HandlerResponse: shoud have same columns as information_schema.tables
162 (https://dev.mysql.com/doc/refman/8.0/en/information-schema-tables-table.html)
163 Column 'TABLE_NAME' is mandatory, other is optional.
164 """
165 self.connect()
167 result = self.connection.tables()
168 try:
169 if result:
170 response = Response(
171 RESPONSE_TYPE.TABLE,
172 data_frame=pd.DataFrame(
173 [x["TABLE_NAME"] for x in result if x["TABLE_SCHEM"] == self.schemaName],
174 columns=["TABLE_NAME"],
175 ),
176 )
177 else:
178 response = Response(RESPONSE_TYPE.OK)
180 except Exception as e:
181 logger.error(f"Error running while getting table {e} on ")
182 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e))
184 return response
186 def get_columns(self, table_name: str) -> StatusResponse:
187 """Returns a list of entity columns
188 Args:
189 table_name (str): name of one of tables returned by self.get_tables()
190 Returns:
191 HandlerResponse: shoud have same columns as information_schema.columns
192 (https://dev.mysql.com/doc/refman/8.0/en/information-schema-columns-table.html)
193 Column 'COLUMN_NAME' is mandatory, other is optional. Hightly
194 recomended to define also 'DATA_TYPE': it should be one of
195 python data types (by default it str).
196 """
198 self.connect()
200 result = self.connection.columns(table_name=table_name)
201 try:
202 if result:
203 response = Response(
204 RESPONSE_TYPE.TABLE,
205 data_frame=pd.DataFrame(
206 [result[i]["COLUMN_NAME"] for i in range(len(result))],
207 columns=["COLUMN_NAME"],
208 ),
209 )
210 else:
211 response = Response(RESPONSE_TYPE.OK)
213 except Exception as e:
214 logger.error(f"Error running while getting table {e} on ")
215 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e))
217 return response
220connection_args = OrderedDict(
221 server={
222 "type": ARG_TYPE.STR,
223 "description": """
224 The server name you want to get connected.
225 """,
226 },
227 database={
228 "type": ARG_TYPE.STR,
229 "description": """
230 The database name to use when connecting with the DB2 server.
231 """,
232 },
233 user={
234 "type": ARG_TYPE.STR,
235 "description": "The user name used to authenticate with the DB2 server.",
236 },
237 password={
238 "type": ARG_TYPE.STR,
239 "description": "The password to authenticate the user with the DB2 server.",
240 },
241 host={
242 "type": ARG_TYPE.STR,
243 "description": "The host name or IP address of the DB2 server/database.",
244 },
245 port={
246 "type": ARG_TYPE.INT,
247 "description": "Specify port to connect DB2 through TCP/IP",
248 },
249 schema_name={
250 "type": ARG_TYPE.STR,
251 "description": "Specify the schema name for showing tables ",
252 },
253 logging_enabled={
254 "type": ARG_TYPE.BOOL,
255 "description": """
256 Used for COMMIT and ROLLBACK as this command works only for logging enabled database.
257 Note: Its optional.
258 Default is TRUE
259 """,
260 },
261)
263connection_args_example = OrderedDict(
264 server="server",
265 database="stores_demo",
266 user="informix",
267 password="in4mix",
268 host="127.0.0.1",
269 port="9091",
270 schema_name="Love",
271)