Coverage for mindsdb / integrations / handlers / teradata_handler / teradata_handler.py: 0%
103 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Any, Dict, Text
3from mindsdb_sql_parser.ast.base import ASTNode
4from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
5from pandas import DataFrame
6import teradatasql
7from teradatasql import OperationalError
8import teradatasqlalchemy.dialect as teradata_dialect
10from mindsdb.integrations.libs.base import DatabaseHandler
11from mindsdb.integrations.libs.response import (
12 HandlerStatusResponse as StatusResponse,
13 HandlerResponse as Response,
14 RESPONSE_TYPE
15)
16from mindsdb.utilities import log
19logger = log.getLogger(__name__)
22class TeradataHandler(DatabaseHandler):
23 """
24 This handler handles the connection and execution of SQL statements on Teradata.
25 """
27 name = 'teradata'
29 def __init__(self, name: Text, connection_data: Dict, **kwargs: Any) -> None:
30 """
31 Initializes the handler.
33 Args:
34 name (Text): The name of the handler instance.
35 connection_data (Dict): The connection data required to connect to the Teradata database.
36 kwargs: Arbitrary keyword arguments.
37 """
38 super().__init__(name)
39 self.connection_data = connection_data
40 self.kwargs = kwargs
42 self.connection = None
43 self.is_connected = False
45 def __del__(self) -> None:
46 """
47 Closes the connection when the handler instance is deleted.
48 """
49 if self.is_connected is True:
50 self.disconnect()
52 def connect(self) -> teradatasql.TeradataConnection:
53 """
54 Establishes a connection to the Teradata database.
56 Raises:
57 ValueError: If the expected connection parameters are not provided.
58 teradatasql.OperationalError: If an error occurs while connecting to the Teradata database.
60 Returns:
61 teradatasql.TeradataConnection: A connection object to the Teradata database.
62 """
63 if self.is_connected is True:
64 return self.connection
66 # Mandatory connection parameters.
67 if not all(key in self.connection_data for key in ['host', 'user', 'password']):
68 raise ValueError('Required parameters (host, user, password) must be provided.')
70 config = {
71 'host': self.connection_data.get('host'),
72 'user': self.connection_data.get('user'),
73 'password': self.connection_data.get('password')
74 }
76 # Optional connection parameters.
77 if 'database' in self.connection_data:
78 config['database'] = self.connection_data.get('database')
80 try:
81 self.connection = teradatasql.connect(
82 **config,
83 )
84 self.is_connected = True
85 return self.connection
86 except OperationalError as operational_error:
87 logger.error(f'Error connecting to Teradata, {operational_error}!')
88 raise
89 except Exception as unknown_error:
90 logger.error(f'Unknown error connecting to Teradata, {unknown_error}!')
91 raise
93 def disconnect(self) -> None:
94 """
95 Closes the connection to the Teradata database if it's currently open.
96 """
97 if self.is_connected is False:
98 return
100 self.connection.close()
101 self.is_connected = False
102 return
104 def check_connection(self) -> StatusResponse:
105 """
106 Checks the status of the connection to the Teradata database.
108 Returns:
109 StatusResponse: An object containing the success status and an error message if an error occurs.
110 """
111 response = StatusResponse(False)
112 need_to_close = self.is_connected is False
114 try:
115 connection = self.connect()
116 with connection.cursor() as cur:
117 cur.execute('SELECT 1 FROM (SELECT 1 AS "dual") AS "dual"')
118 response.success = True
119 except (OperationalError, ValueError) as known_error:
120 logger.error(f'Connection check to Teradata failed, {known_error}!')
121 response.error_message = str(known_error)
122 except Exception as unknown_error:
123 logger.error(f'Connection check to Teradata failed due to an unknown error, {unknown_error}!')
124 response.error_message = str(unknown_error)
126 if response.success is True and need_to_close:
127 self.disconnect()
128 if response.success is False and self.is_connected is True:
129 self.is_connected = False
131 return response
133 def native_query(self, query: Text) -> Response:
134 """
135 Executes a native SQL query on the Teradata database and returns the result.
137 Args:
138 query (Text): The SQL query to be executed.
140 Returns:
141 Response: A response object containing the result of the query or an error message.
142 """
143 need_to_close = self.is_connected is False
145 connection = self.connect()
146 with connection.cursor() as cur:
147 try:
148 cur.execute(query)
149 if not cur.description:
150 response = Response(RESPONSE_TYPE.OK)
151 else:
152 result = cur.fetchall()
153 response = Response(
154 RESPONSE_TYPE.TABLE,
155 DataFrame(
156 result,
157 columns=[x[0] for x in cur.description]
158 )
159 )
160 connection.commit()
161 except OperationalError as operational_error:
162 logger.error(f'Error running query: {query} on {self.connection_data["database"]}!')
163 response = Response(
164 RESPONSE_TYPE.ERROR,
165 error_message=str(operational_error)
166 )
167 connection.rollback()
168 except Exception as unknown_error:
169 logger.error(f'Unknown error running query: {query} on {self.connection_data["database"]}!')
170 response = Response(
171 RESPONSE_TYPE.ERROR,
172 error_message=str(unknown_error)
173 )
174 connection.rollback()
176 if need_to_close is True:
177 self.disconnect()
179 return response
181 def query(self, query: ASTNode) -> Response:
182 """
183 Executes a SQL query represented by an ASTNode on the Teradata database and retrieves the data (if any).
185 Args:
186 query (ASTNode): An ASTNode representing the SQL query to be executed.
188 Returns:
189 Response: The response from the `native_query` method, containing the result of the SQL query execution.
190 """
191 renderer = SqlalchemyRender(teradata_dialect.TeradataDialect)
192 query_str = renderer.get_string(query, with_failback=True)
193 return self.native_query(query_str)
195 def get_tables(self) -> Response:
196 """
197 Retrieves a list of all non-system tables in the Teradata database.
199 Returns:
200 Response: A response object containing a list of tables in the Teradata database.
201 """
202 query = f"""
203 SELECT
204 TableName AS table_name,
205 TableKind AS table_type
206 FROM DBC.TablesV
207 WHERE DatabaseName = '{self.connection_data.get('database') if self.connection_data.get('database') else self.connection_data.get('user')}'
208 AND (TableKind = 'T'
209 OR TableKind = 'O'
210 OR TableKind = 'Q'
211 OR TableKind = 'V')
212 """
213 result = self.native_query(query)
215 df = result.data_frame
216 df['table_type'] = df['table_type'].apply(lambda x: 'VIEW' if x == 'V' else 'BASE TABLE')
218 result.data_frame = df
219 return result
221 def get_columns(self, table_name: Text) -> Response:
222 """
223 Retrieves column details for a specified table in the Teradata database.
225 Args:
226 table_name (Text): The name of the table for which to retrieve column information.
228 Raises:
229 ValueError: If the 'table_name' is not a valid string.
231 Returns:
232 Response: A response object containing the column details.
233 """
234 if not table_name or not isinstance(table_name, str):
235 raise ValueError("Invalid table name provided.")
237 query = f"""
238 SELECT ColumnName AS "Field",
239 ColumnType AS "Type"
240 FROM DBC.ColumnsV
241 WHERE DatabaseName = '{self.connection_data.get('database') if self.connection_data.get('database') else self.connection_data.get('user')}'
242 AND TableName = '{table_name}'
243 """
245 return self.native_query(query)