Coverage for mindsdb / integrations / handlers / ingres_handler / ingres_handler.py: 0%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pyodbc 

2 

3import pandas as pd 

4from mindsdb_sql_parser import parse_sql 

5from ingres_sa_dialect.base import IngresDialect 

6from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

7from mindsdb_sql_parser.ast.base import ASTNode 

8from mindsdb.integrations.libs.base import DatabaseHandler 

9 

10from mindsdb.utilities import log 

11from mindsdb.integrations.libs.response import ( 

12 HandlerStatusResponse as StatusResponse, 

13 HandlerResponse as Response, 

14 RESPONSE_TYPE 

15) 

16 

17 

18logger = log.getLogger(__name__) 

19 

20 

21class IngresHandler(DatabaseHandler): 

22 """ 

23 This handler handles connection and execution of the Ingres statements. 

24 """ 

25 

26 name = 'ingres' 

27 

28 def __init__(self, name: str, **kwargs): 

29 """ 

30 Initializes a new instance of the Ingres handler. 

31 

32 Args: 

33 name (str): The name of the database. 

34 **kwargs: parameters for connecting to the database 

35 """ 

36 super().__init__(name) 

37 self.parser = parse_sql 

38 self.dialect = 'ingres' 

39 self.connection_args = kwargs.get('connection_data') 

40 self.database = self.connection_args.get('database') 

41 self.server = self.connection_args.get('server') 

42 self.user = self.connection_args.get('user') 

43 self.password = self.connection_args.get('password') 

44 self.servertype = self.connection_args.get('servertype', 'ingres') 

45 self.connection = None 

46 self.is_connected = False 

47 

48 def __del__(self): 

49 """ 

50 Destructor for the Ingres class. 

51 """ 

52 if self.is_connected is True: 

53 self.disconnect() 

54 

55 def connect(self): 

56 """ 

57 Establishes a connection to the Ingres server. 

58 Returns: 

59 HandlerStatusResponse 

60 """ 

61 if self.is_connected: 

62 return self.connection 

63 

64 conn_str = f"Driver={{Ingres}};Server={self.server};Database={self.database};UID={self.user};" \ 

65 f"PWD={self.password};ServerType={self.servertype}" 

66 

67 self.connection = pyodbc.connect(conn_str) 

68 self.is_connected = True 

69 return self.connection 

70 

71 def check_connection(self) -> StatusResponse: 

72 """ 

73 Check connection to the handler. 

74 Returns: 

75 HandlerStatusResponse 

76 """ 

77 

78 response = StatusResponse(False) 

79 need_to_close = self.is_connected is False 

80 

81 try: 

82 self.connect() 

83 response.success = True 

84 except Exception as e: 

85 logger.error(f'Error connecting to Ingres, {e}!') 

86 response.error_message = str(e) 

87 finally: 

88 if response.success is True and need_to_close: 

89 self.disconnect() 

90 if response.success is False and self.is_connected is True: 

91 self.is_connected = False 

92 

93 return response 

94 

95 def disconnect(self): 

96 """ 

97 Closes the connection to the Ingres server. 

98 """ 

99 

100 if self.is_connected is False: 

101 return 

102 

103 self.connection.close() 

104 self.is_connected = False 

105 return self.is_connected 

106 

107 def native_query(self, query: str) -> Response: 

108 """ 

109 Receive raw query and act upon it somehow. 

110 Args: 

111 query (str): SQL query to execute. 

112 Returns: 

113 HandlerResponse 

114 """ 

115 need_to_close = self.is_connected is False 

116 

117 connection = self.connect() 

118 with connection.cursor() as cursor: 

119 try: 

120 cursor.execute(query) 

121 result = cursor.fetchall() 

122 if result: 

123 response = Response( 

124 RESPONSE_TYPE.TABLE, 

125 data_frame=pd.DataFrame.from_records( 

126 result, 

127 columns=[x[0] for x in cursor.description] 

128 ) 

129 ) 

130 else: 

131 response = Response(RESPONSE_TYPE.OK) 

132 connection.commit() 

133 except Exception as e: 

134 logger.error(f'Error running query: {query} on {self.connection_args["database"]}!') 

135 response = Response( 

136 RESPONSE_TYPE.ERROR, 

137 error_message=str(e) 

138 ) 

139 

140 if need_to_close is True: 

141 self.disconnect() 

142 

143 return response 

144 

145 def query(self, query: ASTNode) -> Response: 

146 """ 

147 Receive query as AST (abstract syntax tree) and act upon it somehow. 

148 Args: 

149 query (ASTNode): sql query represented as AST. May be any kind 

150 of query: SELECT, INSERT, DELETE, etc 

151 Returns: 

152 HandlerResponse 

153 """ 

154 

155 renderer = SqlalchemyRender(IngresDialect) 

156 query_str = renderer.get_string(query, with_failback=True) 

157 return self.native_query(query_str) 

158 

159 def get_tables(self) -> Response: 

160 """ 

161 Gets a list of table names in the database. 

162 

163 Returns: 

164 list: A list of table names in the database. 

165 """ 

166 connection = self.connect() 

167 cursor = connection.cursor() 

168 # Execute query to get all table names 

169 cursor.execute( 

170 "SELECT table_name FROM iitables WHERE table_type = 'T'") 

171 

172 table_names = [x[0] for x in cursor.fetchall()] 

173 

174 # Create dataframe with table names 

175 df = pd.DataFrame(table_names, columns=['table_name', 'data_type']) 

176 

177 # Create response object 

178 response = Response( 

179 RESPONSE_TYPE.TABLE, 

180 df 

181 ) 

182 

183 return response 

184 

185 def get_columns(self, table_name: str) -> Response: 

186 """ 

187 Gets a list of column names in the specified table. 

188 

189 Args: 

190 table_name (str): The name of the table to get column names from. 

191 

192 Returns: 

193 list: A list of column names in the specified table. 

194 """ 

195 conn = self.connect() 

196 cursor = conn.cursor() 

197 cursor.execute("SELECT column_name FROM iicolumns WHERE table_name = '{}'".format(table_name)) 

198 results = cursor.fetchall() 

199 

200 # construct a pandas dataframe from the query results 

201 df = pd.DataFrame( 

202 results, 

203 columns=['column_name', 'data_type'] 

204 ) 

205 

206 response = Response( 

207 RESPONSE_TYPE.TABLE, 

208 df 

209 ) 

210 

211 return response