Coverage for mindsdb / integrations / handlers / db2_handler / db2_handler.py: 0%
110 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Text, Dict, Optional, Any
3import ibm_db_dbi
4from ibm_db_dbi import OperationalError, ProgrammingError
5from ibm_db_sa.ibm_db import DB2Dialect_ibm_db as DB2Dialect
6from mindsdb_sql_parser.ast.base import ASTNode
7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
8import pandas as pd
10from mindsdb.integrations.libs.base import DatabaseHandler
11from mindsdb.integrations.libs.response import (
12 HandlerStatusResponse as StatusResponse,
13 HandlerResponse as Response,
14 RESPONSE_TYPE,
15)
16from mindsdb.utilities import log
19logger = log.getLogger(__name__)
22class DB2Handler(DatabaseHandler):
23 name = "db2"
25 def __init__(self, name: Text, connection_data: Optional[Dict], **kwargs: Any) -> None:
26 """
27 Initializes the handler.
28 Args:
29 name (Text): The name of the handler instance.
30 connection_data (Dict): The connection data required to connect to the IBM Db2 database.
31 kwargs: Arbitrary keyword arguments.
32 """
33 super().__init__(name)
34 self.connection_data = connection_data
35 self.kwargs = kwargs
37 self.connection = None
38 self.is_connected = False
40 def __del__(self) -> None:
41 """
42 Closes the connection when the handler instance is deleted.
43 """
44 if self.is_connected:
45 self.disconnect()
47 def connect(self) -> ibm_db_dbi.Connection:
48 """
49 Establishes a connection to a IBM Db2 database.
51 Raises:
52 ValueError: If the required connection parameters are not provided.
53 ibm_db_dbi.OperationalError: If an error occurs while connecting to the IBM Db2 database.
55 Returns:
56 ibm_db_dbi.Connection: A connection object to the IBM Db2 database.
57 """
58 if self.is_connected:
59 return self.connection
61 # Mandatory connection parameters.
62 if not all(key in self.connection_data for key in ["host", "user", "password", "database"]):
63 raise ValueError("Required parameters (host, user, password, database) must be provided.")
64 cloud = "databases.appdomain.cloud" in self.connection_data["host"]
65 if cloud:
66 connection_string = f"DATABASE={self.connection_data['database']};HOSTNAME={self.connection_data['host']};PORT={self.connection_data['port']};PROTOCOL=TCPIP;UID={self.connection_data['user']};PWD={self.connection_data['password']};SECURITY=SSL;"
67 connection_string += "SSLSERVERCERTIFICATE=;"
68 else:
69 connection_string = f"DRIVER={'IBM DB2 ODBC DRIVER'};DATABASE={self.connection_data['database']};HOST={self.connection_data['host']};PROTOCOL=TCPIP;UID={self.connection_data['user']};PWD={self.connection_data['password']};"
71 # Optional connection parameters.
72 if "port" in self.connection_data:
73 connection_string += f"PORT={self.connection_data['port']};"
75 if "schema" in self.connection_data:
76 connection_string += f"CURRENTSCHEMA={self.connection_data['schema']};"
78 try:
79 self.connection = ibm_db_dbi.pconnect(connection_string, "", "")
80 self.is_connected = True
81 return self.connection
82 except OperationalError as operational_error:
83 logger.error(f"Error while connecting to {self.connection_data.get('database')}, {operational_error}!")
84 raise
85 except Exception as unknown_error:
86 logger.error(f"Unknown error while connecting to {self.connection_data.get('database')}, {unknown_error}!")
87 raise
89 def disconnect(self) -> None:
90 """
91 Closes the connection to the IBM Db2 database if it's currently open.
92 """
93 if not self.is_connected:
94 return
96 self.connection.close()
97 self.is_connected = False
99 def check_connection(self) -> StatusResponse:
100 """
101 Checks the status of the connection to the IBM Db2 database.
103 Returns:
104 StatusResponse: An object containing the success status and an error message if an error occurs.
105 """
106 response = StatusResponse(False)
107 need_to_close = self.is_connected is False
109 try:
110 self.connect()
111 response.success = True
112 except (OperationalError, ValueError) as known_error:
113 logger.error(f"Connection check to IBM Db2 failed, {known_error}!")
114 response.error_message = str(known_error)
115 except Exception as unknown_error:
116 logger.error(f"Connection check to IBM Db2 failed due to an unknown error, {unknown_error}!")
117 response.error_message = str(unknown_error)
119 if response.success and need_to_close:
120 self.disconnect()
122 elif not response.success and self.is_connected:
123 self.is_connected = False
125 return response
127 def native_query(self, query: Text) -> Response:
128 """
129 Executes a SQL query on the IBM Db2 database and returns the result (if any).
131 Args:
132 query (str): The SQL query to be executed.
134 Returns:
135 Response: A response object containing the result of the query or an error message.
136 """
137 need_to_close = self.is_connected is False
139 connection = self.connect()
140 with connection.cursor() as cur:
141 try:
142 cur.execute(query)
144 if cur._result_set_produced:
145 result = cur.fetchall()
146 response = Response(
147 RESPONSE_TYPE.TABLE,
148 data_frame=pd.DataFrame(result, columns=[x[0] for x in cur.description]),
149 )
150 else:
151 response = Response(RESPONSE_TYPE.OK)
152 connection.commit()
153 except (OperationalError, ProgrammingError) as known_error:
154 logger.error(f"Error running query: {query} on {self.connection_data.get('database')}!")
155 response = Response(RESPONSE_TYPE.ERROR, error_message=str(known_error))
156 connection.rollback()
158 except Exception as unknown_error:
159 logger.error(f"Unknown error running query: {query} on {self.connection_data.get('database')}!")
160 response = Response(RESPONSE_TYPE.ERROR, error_message=str(unknown_error))
161 connection.rollback()
163 if need_to_close is True:
164 self.disconnect()
166 return response
168 def query(self, query: ASTNode) -> Response:
169 """
170 Executes a SQL query represented by an ASTNode on the IBM Db2 database and retrieves the data (if any).
172 Args:
173 query (ASTNode): An ASTNode representing the SQL query to be executed.
175 Returns:
176 Response: The response from the `native_query` method, containing the result of the SQL query execution.
177 """
178 renderer = SqlalchemyRender(DB2Dialect)
179 query_str = renderer.get_string(query, with_failback=True)
180 return self.native_query(query_str)
182 def get_tables(self) -> Response:
183 """
184 Retrieves a list of all non-system tables and views in the current schema of the IBM Db2 database.
186 Returns:
187 Response: A response object containing the list of tables and views, formatted as per the `Response` class.
188 """
189 connection = self.connect()
191 result = connection.tables(connection.current_schema)
193 tables = []
194 for table in result:
195 tables.append(
196 {
197 "TABLE_NAME": table["TABLE_NAME"],
198 "TABLE_SCHEMA": table["TABLE_SCHEM"],
199 "TABLE_TYPE": table["TABLE_TYPE"],
200 }
201 )
203 response = Response(RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(tables))
205 return response
207 def get_columns(self, table_name: Text) -> Response:
208 """
209 Retrieves column details for a specified table in the IBM Db2 database.
211 Args:
212 table_name (Text): The name of the table for which to retrieve column information.
214 Raises:
215 ValueError: If the 'table_name' is not a valid string.
217 Returns:
218 Response: A response object containing the column details.
219 """
220 if not table_name or not isinstance(table_name, str):
221 raise ValueError("Invalid table name provided.")
223 connection = self.connect()
225 result = connection.columns(table_name=table_name)
227 columns = [column["COLUMN_NAME"] for column in result]
229 response = Response(RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(columns, columns=["COLUMN_NAME"]))
231 return response