Coverage for mindsdb / integrations / handlers / teradata_handler / teradata_handler.py: 0%

103 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Any, Dict, Text 

2 

3from mindsdb_sql_parser.ast.base import ASTNode 

4from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

5from pandas import DataFrame 

6import teradatasql 

7from teradatasql import OperationalError 

8import teradatasqlalchemy.dialect as teradata_dialect 

9 

10from mindsdb.integrations.libs.base import DatabaseHandler 

11from mindsdb.integrations.libs.response import ( 

12 HandlerStatusResponse as StatusResponse, 

13 HandlerResponse as Response, 

14 RESPONSE_TYPE 

15) 

16from mindsdb.utilities import log 

17 

18 

19logger = log.getLogger(__name__) 

20 

21 

22class TeradataHandler(DatabaseHandler): 

23 """ 

24 This handler handles the connection and execution of SQL statements on Teradata. 

25 """ 

26 

27 name = 'teradata' 

28 

29 def __init__(self, name: Text, connection_data: Dict, **kwargs: Any) -> None: 

30 """ 

31 Initializes the handler. 

32 

33 Args: 

34 name (Text): The name of the handler instance. 

35 connection_data (Dict): The connection data required to connect to the Teradata database. 

36 kwargs: Arbitrary keyword arguments. 

37 """ 

38 super().__init__(name) 

39 self.connection_data = connection_data 

40 self.kwargs = kwargs 

41 

42 self.connection = None 

43 self.is_connected = False 

44 

45 def __del__(self) -> None: 

46 """ 

47 Closes the connection when the handler instance is deleted. 

48 """ 

49 if self.is_connected is True: 

50 self.disconnect() 

51 

52 def connect(self) -> teradatasql.TeradataConnection: 

53 """ 

54 Establishes a connection to the Teradata database. 

55 

56 Raises: 

57 ValueError: If the expected connection parameters are not provided. 

58 teradatasql.OperationalError: If an error occurs while connecting to the Teradata database. 

59 

60 Returns: 

61 teradatasql.TeradataConnection: A connection object to the Teradata database. 

62 """ 

63 if self.is_connected is True: 

64 return self.connection 

65 

66 # Mandatory connection parameters. 

67 if not all(key in self.connection_data for key in ['host', 'user', 'password']): 

68 raise ValueError('Required parameters (host, user, password) must be provided.') 

69 

70 config = { 

71 'host': self.connection_data.get('host'), 

72 'user': self.connection_data.get('user'), 

73 'password': self.connection_data.get('password') 

74 } 

75 

76 # Optional connection parameters. 

77 if 'database' in self.connection_data: 

78 config['database'] = self.connection_data.get('database') 

79 

80 try: 

81 self.connection = teradatasql.connect( 

82 **config, 

83 ) 

84 self.is_connected = True 

85 return self.connection 

86 except OperationalError as operational_error: 

87 logger.error(f'Error connecting to Teradata, {operational_error}!') 

88 raise 

89 except Exception as unknown_error: 

90 logger.error(f'Unknown error connecting to Teradata, {unknown_error}!') 

91 raise 

92 

93 def disconnect(self) -> None: 

94 """ 

95 Closes the connection to the Teradata database if it's currently open. 

96 """ 

97 if self.is_connected is False: 

98 return 

99 

100 self.connection.close() 

101 self.is_connected = False 

102 return 

103 

104 def check_connection(self) -> StatusResponse: 

105 """ 

106 Checks the status of the connection to the Teradata database. 

107 

108 Returns: 

109 StatusResponse: An object containing the success status and an error message if an error occurs. 

110 """ 

111 response = StatusResponse(False) 

112 need_to_close = self.is_connected is False 

113 

114 try: 

115 connection = self.connect() 

116 with connection.cursor() as cur: 

117 cur.execute('SELECT 1 FROM (SELECT 1 AS "dual") AS "dual"') 

118 response.success = True 

119 except (OperationalError, ValueError) as known_error: 

120 logger.error(f'Connection check to Teradata failed, {known_error}!') 

121 response.error_message = str(known_error) 

122 except Exception as unknown_error: 

123 logger.error(f'Connection check to Teradata failed due to an unknown error, {unknown_error}!') 

124 response.error_message = str(unknown_error) 

125 

126 if response.success is True and need_to_close: 

127 self.disconnect() 

128 if response.success is False and self.is_connected is True: 

129 self.is_connected = False 

130 

131 return response 

132 

133 def native_query(self, query: Text) -> Response: 

134 """ 

135 Executes a native SQL query on the Teradata database and returns the result. 

136 

137 Args: 

138 query (Text): The SQL query to be executed. 

139 

140 Returns: 

141 Response: A response object containing the result of the query or an error message. 

142 """ 

143 need_to_close = self.is_connected is False 

144 

145 connection = self.connect() 

146 with connection.cursor() as cur: 

147 try: 

148 cur.execute(query) 

149 if not cur.description: 

150 response = Response(RESPONSE_TYPE.OK) 

151 else: 

152 result = cur.fetchall() 

153 response = Response( 

154 RESPONSE_TYPE.TABLE, 

155 DataFrame( 

156 result, 

157 columns=[x[0] for x in cur.description] 

158 ) 

159 ) 

160 connection.commit() 

161 except OperationalError as operational_error: 

162 logger.error(f'Error running query: {query} on {self.connection_data["database"]}!') 

163 response = Response( 

164 RESPONSE_TYPE.ERROR, 

165 error_message=str(operational_error) 

166 ) 

167 connection.rollback() 

168 except Exception as unknown_error: 

169 logger.error(f'Unknown error running query: {query} on {self.connection_data["database"]}!') 

170 response = Response( 

171 RESPONSE_TYPE.ERROR, 

172 error_message=str(unknown_error) 

173 ) 

174 connection.rollback() 

175 

176 if need_to_close is True: 

177 self.disconnect() 

178 

179 return response 

180 

181 def query(self, query: ASTNode) -> Response: 

182 """ 

183 Executes a SQL query represented by an ASTNode on the Teradata database and retrieves the data (if any). 

184 

185 Args: 

186 query (ASTNode): An ASTNode representing the SQL query to be executed. 

187 

188 Returns: 

189 Response: The response from the `native_query` method, containing the result of the SQL query execution. 

190 """ 

191 renderer = SqlalchemyRender(teradata_dialect.TeradataDialect) 

192 query_str = renderer.get_string(query, with_failback=True) 

193 return self.native_query(query_str) 

194 

195 def get_tables(self) -> Response: 

196 """ 

197 Retrieves a list of all non-system tables in the Teradata database. 

198 

199 Returns: 

200 Response: A response object containing a list of tables in the Teradata database. 

201 """ 

202 query = f""" 

203 SELECT 

204 TableName AS table_name, 

205 TableKind AS table_type 

206 FROM DBC.TablesV 

207 WHERE DatabaseName = '{self.connection_data.get('database') if self.connection_data.get('database') else self.connection_data.get('user')}' 

208 AND (TableKind = 'T' 

209 OR TableKind = 'O' 

210 OR TableKind = 'Q' 

211 OR TableKind = 'V') 

212 """ 

213 result = self.native_query(query) 

214 

215 df = result.data_frame 

216 df['table_type'] = df['table_type'].apply(lambda x: 'VIEW' if x == 'V' else 'BASE TABLE') 

217 

218 result.data_frame = df 

219 return result 

220 

221 def get_columns(self, table_name: Text) -> Response: 

222 """ 

223 Retrieves column details for a specified table in the Teradata database. 

224 

225 Args: 

226 table_name (Text): The name of the table for which to retrieve column information. 

227 

228 Raises: 

229 ValueError: If the 'table_name' is not a valid string. 

230 

231 Returns: 

232 Response: A response object containing the column details. 

233 """ 

234 if not table_name or not isinstance(table_name, str): 

235 raise ValueError("Invalid table name provided.") 

236 

237 query = f""" 

238 SELECT ColumnName AS "Field", 

239 ColumnType AS "Type" 

240 FROM DBC.ColumnsV 

241 WHERE DatabaseName = '{self.connection_data.get('database') if self.connection_data.get('database') else self.connection_data.get('user')}' 

242 AND TableName = '{table_name}' 

243 """ 

244 

245 return self.native_query(query)