Coverage for mindsdb / integrations / handlers / ingres_handler / ingres_handler.py: 0%
91 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pyodbc
3import pandas as pd
4from mindsdb_sql_parser import parse_sql
5from ingres_sa_dialect.base import IngresDialect
6from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
7from mindsdb_sql_parser.ast.base import ASTNode
8from mindsdb.integrations.libs.base import DatabaseHandler
10from mindsdb.utilities import log
11from mindsdb.integrations.libs.response import (
12 HandlerStatusResponse as StatusResponse,
13 HandlerResponse as Response,
14 RESPONSE_TYPE
15)
18logger = log.getLogger(__name__)
21class IngresHandler(DatabaseHandler):
22 """
23 This handler handles connection and execution of the Ingres statements.
24 """
26 name = 'ingres'
28 def __init__(self, name: str, **kwargs):
29 """
30 Initializes a new instance of the Ingres handler.
32 Args:
33 name (str): The name of the database.
34 **kwargs: parameters for connecting to the database
35 """
36 super().__init__(name)
37 self.parser = parse_sql
38 self.dialect = 'ingres'
39 self.connection_args = kwargs.get('connection_data')
40 self.database = self.connection_args.get('database')
41 self.server = self.connection_args.get('server')
42 self.user = self.connection_args.get('user')
43 self.password = self.connection_args.get('password')
44 self.servertype = self.connection_args.get('servertype', 'ingres')
45 self.connection = None
46 self.is_connected = False
48 def __del__(self):
49 """
50 Destructor for the Ingres class.
51 """
52 if self.is_connected is True:
53 self.disconnect()
55 def connect(self):
56 """
57 Establishes a connection to the Ingres server.
58 Returns:
59 HandlerStatusResponse
60 """
61 if self.is_connected:
62 return self.connection
64 conn_str = f"Driver={{Ingres}};Server={self.server};Database={self.database};UID={self.user};" \
65 f"PWD={self.password};ServerType={self.servertype}"
67 self.connection = pyodbc.connect(conn_str)
68 self.is_connected = True
69 return self.connection
71 def check_connection(self) -> StatusResponse:
72 """
73 Check connection to the handler.
74 Returns:
75 HandlerStatusResponse
76 """
78 response = StatusResponse(False)
79 need_to_close = self.is_connected is False
81 try:
82 self.connect()
83 response.success = True
84 except Exception as e:
85 logger.error(f'Error connecting to Ingres, {e}!')
86 response.error_message = str(e)
87 finally:
88 if response.success is True and need_to_close:
89 self.disconnect()
90 if response.success is False and self.is_connected is True:
91 self.is_connected = False
93 return response
95 def disconnect(self):
96 """
97 Closes the connection to the Ingres server.
98 """
100 if self.is_connected is False:
101 return
103 self.connection.close()
104 self.is_connected = False
105 return self.is_connected
107 def native_query(self, query: str) -> Response:
108 """
109 Receive raw query and act upon it somehow.
110 Args:
111 query (str): SQL query to execute.
112 Returns:
113 HandlerResponse
114 """
115 need_to_close = self.is_connected is False
117 connection = self.connect()
118 with connection.cursor() as cursor:
119 try:
120 cursor.execute(query)
121 result = cursor.fetchall()
122 if result:
123 response = Response(
124 RESPONSE_TYPE.TABLE,
125 data_frame=pd.DataFrame.from_records(
126 result,
127 columns=[x[0] for x in cursor.description]
128 )
129 )
130 else:
131 response = Response(RESPONSE_TYPE.OK)
132 connection.commit()
133 except Exception as e:
134 logger.error(f'Error running query: {query} on {self.connection_args["database"]}!')
135 response = Response(
136 RESPONSE_TYPE.ERROR,
137 error_message=str(e)
138 )
140 if need_to_close is True:
141 self.disconnect()
143 return response
145 def query(self, query: ASTNode) -> Response:
146 """
147 Receive query as AST (abstract syntax tree) and act upon it somehow.
148 Args:
149 query (ASTNode): sql query represented as AST. May be any kind
150 of query: SELECT, INSERT, DELETE, etc
151 Returns:
152 HandlerResponse
153 """
155 renderer = SqlalchemyRender(IngresDialect)
156 query_str = renderer.get_string(query, with_failback=True)
157 return self.native_query(query_str)
159 def get_tables(self) -> Response:
160 """
161 Gets a list of table names in the database.
163 Returns:
164 list: A list of table names in the database.
165 """
166 connection = self.connect()
167 cursor = connection.cursor()
168 # Execute query to get all table names
169 cursor.execute(
170 "SELECT table_name FROM iitables WHERE table_type = 'T'")
172 table_names = [x[0] for x in cursor.fetchall()]
174 # Create dataframe with table names
175 df = pd.DataFrame(table_names, columns=['table_name', 'data_type'])
177 # Create response object
178 response = Response(
179 RESPONSE_TYPE.TABLE,
180 df
181 )
183 return response
185 def get_columns(self, table_name: str) -> Response:
186 """
187 Gets a list of column names in the specified table.
189 Args:
190 table_name (str): The name of the table to get column names from.
192 Returns:
193 list: A list of column names in the specified table.
194 """
195 conn = self.connect()
196 cursor = conn.cursor()
197 cursor.execute("SELECT column_name FROM iicolumns WHERE table_name = '{}'".format(table_name))
198 results = cursor.fetchall()
200 # construct a pandas dataframe from the query results
201 df = pd.DataFrame(
202 results,
203 columns=['column_name', 'data_type']
204 )
206 response = Response(
207 RESPONSE_TYPE.TABLE,
208 df
209 )
211 return response