Coverage for mindsdb / integrations / handlers / db2_handler / db2_handler.py: 0%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Text, Dict, Optional, Any 

2 

3import ibm_db_dbi 

4from ibm_db_dbi import OperationalError, ProgrammingError 

5from ibm_db_sa.ibm_db import DB2Dialect_ibm_db as DB2Dialect 

6from mindsdb_sql_parser.ast.base import ASTNode 

7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

8import pandas as pd 

9 

10from mindsdb.integrations.libs.base import DatabaseHandler 

11from mindsdb.integrations.libs.response import ( 

12 HandlerStatusResponse as StatusResponse, 

13 HandlerResponse as Response, 

14 RESPONSE_TYPE, 

15) 

16from mindsdb.utilities import log 

17 

18 

19logger = log.getLogger(__name__) 

20 

21 

22class DB2Handler(DatabaseHandler): 

23 name = "db2" 

24 

25 def __init__(self, name: Text, connection_data: Optional[Dict], **kwargs: Any) -> None: 

26 """ 

27 Initializes the handler. 

28 Args: 

29 name (Text): The name of the handler instance. 

30 connection_data (Dict): The connection data required to connect to the IBM Db2 database. 

31 kwargs: Arbitrary keyword arguments. 

32 """ 

33 super().__init__(name) 

34 self.connection_data = connection_data 

35 self.kwargs = kwargs 

36 

37 self.connection = None 

38 self.is_connected = False 

39 

40 def __del__(self) -> None: 

41 """ 

42 Closes the connection when the handler instance is deleted. 

43 """ 

44 if self.is_connected: 

45 self.disconnect() 

46 

47 def connect(self) -> ibm_db_dbi.Connection: 

48 """ 

49 Establishes a connection to a IBM Db2 database. 

50 

51 Raises: 

52 ValueError: If the required connection parameters are not provided. 

53 ibm_db_dbi.OperationalError: If an error occurs while connecting to the IBM Db2 database. 

54 

55 Returns: 

56 ibm_db_dbi.Connection: A connection object to the IBM Db2 database. 

57 """ 

58 if self.is_connected: 

59 return self.connection 

60 

61 # Mandatory connection parameters. 

62 if not all(key in self.connection_data for key in ["host", "user", "password", "database"]): 

63 raise ValueError("Required parameters (host, user, password, database) must be provided.") 

64 cloud = "databases.appdomain.cloud" in self.connection_data["host"] 

65 if cloud: 

66 connection_string = f"DATABASE={self.connection_data['database']};HOSTNAME={self.connection_data['host']};PORT={self.connection_data['port']};PROTOCOL=TCPIP;UID={self.connection_data['user']};PWD={self.connection_data['password']};SECURITY=SSL;" 

67 connection_string += "SSLSERVERCERTIFICATE=;" 

68 else: 

69 connection_string = f"DRIVER={'IBM DB2 ODBC DRIVER'};DATABASE={self.connection_data['database']};HOST={self.connection_data['host']};PROTOCOL=TCPIP;UID={self.connection_data['user']};PWD={self.connection_data['password']};" 

70 

71 # Optional connection parameters. 

72 if "port" in self.connection_data: 

73 connection_string += f"PORT={self.connection_data['port']};" 

74 

75 if "schema" in self.connection_data: 

76 connection_string += f"CURRENTSCHEMA={self.connection_data['schema']};" 

77 

78 try: 

79 self.connection = ibm_db_dbi.pconnect(connection_string, "", "") 

80 self.is_connected = True 

81 return self.connection 

82 except OperationalError as operational_error: 

83 logger.error(f"Error while connecting to {self.connection_data.get('database')}, {operational_error}!") 

84 raise 

85 except Exception as unknown_error: 

86 logger.error(f"Unknown error while connecting to {self.connection_data.get('database')}, {unknown_error}!") 

87 raise 

88 

89 def disconnect(self) -> None: 

90 """ 

91 Closes the connection to the IBM Db2 database if it's currently open. 

92 """ 

93 if not self.is_connected: 

94 return 

95 

96 self.connection.close() 

97 self.is_connected = False 

98 

99 def check_connection(self) -> StatusResponse: 

100 """ 

101 Checks the status of the connection to the IBM Db2 database. 

102 

103 Returns: 

104 StatusResponse: An object containing the success status and an error message if an error occurs. 

105 """ 

106 response = StatusResponse(False) 

107 need_to_close = self.is_connected is False 

108 

109 try: 

110 self.connect() 

111 response.success = True 

112 except (OperationalError, ValueError) as known_error: 

113 logger.error(f"Connection check to IBM Db2 failed, {known_error}!") 

114 response.error_message = str(known_error) 

115 except Exception as unknown_error: 

116 logger.error(f"Connection check to IBM Db2 failed due to an unknown error, {unknown_error}!") 

117 response.error_message = str(unknown_error) 

118 

119 if response.success and need_to_close: 

120 self.disconnect() 

121 

122 elif not response.success and self.is_connected: 

123 self.is_connected = False 

124 

125 return response 

126 

127 def native_query(self, query: Text) -> Response: 

128 """ 

129 Executes a SQL query on the IBM Db2 database and returns the result (if any). 

130 

131 Args: 

132 query (str): The SQL query to be executed. 

133 

134 Returns: 

135 Response: A response object containing the result of the query or an error message. 

136 """ 

137 need_to_close = self.is_connected is False 

138 

139 connection = self.connect() 

140 with connection.cursor() as cur: 

141 try: 

142 cur.execute(query) 

143 

144 if cur._result_set_produced: 

145 result = cur.fetchall() 

146 response = Response( 

147 RESPONSE_TYPE.TABLE, 

148 data_frame=pd.DataFrame(result, columns=[x[0] for x in cur.description]), 

149 ) 

150 else: 

151 response = Response(RESPONSE_TYPE.OK) 

152 connection.commit() 

153 except (OperationalError, ProgrammingError) as known_error: 

154 logger.error(f"Error running query: {query} on {self.connection_data.get('database')}!") 

155 response = Response(RESPONSE_TYPE.ERROR, error_message=str(known_error)) 

156 connection.rollback() 

157 

158 except Exception as unknown_error: 

159 logger.error(f"Unknown error running query: {query} on {self.connection_data.get('database')}!") 

160 response = Response(RESPONSE_TYPE.ERROR, error_message=str(unknown_error)) 

161 connection.rollback() 

162 

163 if need_to_close is True: 

164 self.disconnect() 

165 

166 return response 

167 

168 def query(self, query: ASTNode) -> Response: 

169 """ 

170 Executes a SQL query represented by an ASTNode on the IBM Db2 database and retrieves the data (if any). 

171 

172 Args: 

173 query (ASTNode): An ASTNode representing the SQL query to be executed. 

174 

175 Returns: 

176 Response: The response from the `native_query` method, containing the result of the SQL query execution. 

177 """ 

178 renderer = SqlalchemyRender(DB2Dialect) 

179 query_str = renderer.get_string(query, with_failback=True) 

180 return self.native_query(query_str) 

181 

182 def get_tables(self) -> Response: 

183 """ 

184 Retrieves a list of all non-system tables and views in the current schema of the IBM Db2 database. 

185 

186 Returns: 

187 Response: A response object containing the list of tables and views, formatted as per the `Response` class. 

188 """ 

189 connection = self.connect() 

190 

191 result = connection.tables(connection.current_schema) 

192 

193 tables = [] 

194 for table in result: 

195 tables.append( 

196 { 

197 "TABLE_NAME": table["TABLE_NAME"], 

198 "TABLE_SCHEMA": table["TABLE_SCHEM"], 

199 "TABLE_TYPE": table["TABLE_TYPE"], 

200 } 

201 ) 

202 

203 response = Response(RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(tables)) 

204 

205 return response 

206 

207 def get_columns(self, table_name: Text) -> Response: 

208 """ 

209 Retrieves column details for a specified table in the IBM Db2 database. 

210 

211 Args: 

212 table_name (Text): The name of the table for which to retrieve column information. 

213 

214 Raises: 

215 ValueError: If the 'table_name' is not a valid string. 

216 

217 Returns: 

218 Response: A response object containing the column details. 

219 """ 

220 if not table_name or not isinstance(table_name, str): 

221 raise ValueError("Invalid table name provided.") 

222 

223 connection = self.connect() 

224 

225 result = connection.columns(table_name=table_name) 

226 

227 columns = [column["COLUMN_NAME"] for column in result] 

228 

229 response = Response(RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(columns, columns=["COLUMN_NAME"])) 

230 

231 return response