Coverage for mindsdb / integrations / handlers / hive_handler / hive_handler.py: 0%

104 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Text, Dict, Optional 

2 

3from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

4from mindsdb_sql_parser.ast.base import ASTNode 

5import pandas as pd 

6from pyhive import (hive, sqlalchemy_hive) 

7from pyhive.exc import OperationalError 

8from thrift.transport.TTransport import TTransportException 

9 

10from mindsdb.integrations.libs.base import DatabaseHandler 

11from mindsdb.integrations.libs.response import ( 

12 HandlerStatusResponse as StatusResponse, 

13 HandlerResponse as Response, 

14 RESPONSE_TYPE 

15) 

16from mindsdb.utilities import log 

17 

18 

19logger = log.getLogger(__name__) 

20 

21 

22class HiveHandler(DatabaseHandler): 

23 """ 

24 This handler handles the connection and execution of SQL statements on Apache Hive. 

25 """ 

26 

27 name = 'hive' 

28 

29 def __init__(self, name: Text, connection_data: Optional[Dict], **kwargs) -> None: 

30 """ 

31 Initializes the handler. 

32 

33 Args: 

34 name (Text): The name of the handler instance. 

35 connection_data (Dict): The connection data required to connect to the Apache Hive server. 

36 kwargs: Arbitrary keyword arguments. 

37 """ 

38 super().__init__(name) 

39 self.connection_data = connection_data 

40 self.kwargs = kwargs 

41 

42 self.connection = None 

43 self.is_connected = False 

44 

45 def __del__(self) -> None: 

46 """ 

47 Closes the connection when the handler instance is deleted. 

48 """ 

49 if self.is_connected: 

50 self.disconnect() 

51 

52 def connect(self) -> hive.Connection: 

53 """ 

54 Establishes a connection to the Apache Hive server. 

55 

56 Raises: 

57 ValueError: If the expected connection parameters are not provided. 

58 

59 Returns: 

60 hive.Connection: A connection object to the Apache Hive server. 

61 """ 

62 if self.is_connected: 

63 return self.connection 

64 

65 # Mandatory connection parameters. 

66 if not all(key in self.connection_data for key in ['host', 'database']): 

67 raise ValueError('Required parameters (account, database) must be provided.') 

68 

69 config = { 

70 'host': self.connection_data.get('host'), 

71 'database': self.connection_data.get('database') 

72 } 

73 

74 # Optional connection parameters. 

75 optional_parameters = ['port', 'username', 'password'] 

76 for param in optional_parameters: 

77 if param in self.connection_data: 

78 config[param] = self.connection_data[param] 

79 

80 config['auth'] = self.connection_data.get('auth', 'CUSTOM').upper() 

81 

82 try: 

83 self.connection = hive.Connection(**config) 

84 self.is_connected = True 

85 return self.connection 

86 except (OperationalError, TTransportException, ValueError) as known_error: 

87 logger.error(f'Error connecting to Hive {config["database"]}, {known_error}!') 

88 raise 

89 except Exception as unknown_error: 

90 logger.error(f'Unknown error connecting to Hive {config["database"]}, {unknown_error}!') 

91 raise 

92 

93 def disconnect(self) -> None: 

94 """ 

95 Closes the connection to the Apache Hive server if it's currently open. 

96 """ 

97 if self.is_connected is False: 

98 return 

99 

100 self.connection.close() 

101 self.is_connected = False 

102 return 

103 

104 def check_connection(self) -> StatusResponse: 

105 """ 

106 Checks the status of the connection to the Apache Hive server. 

107 

108 Returns: 

109 StatusResponse: An object containing the success status and an error message if an error occurs. 

110 """ 

111 response = StatusResponse(False) 

112 need_to_close = self.is_connected is False 

113 

114 try: 

115 self.connect() 

116 response.success = True 

117 except (OperationalError, TTransportException, ValueError) as known_error: 

118 logger.error(f'Connection check to Hive failed, {known_error}!') 

119 response.error_message = str(known_error) 

120 except Exception as unknown_error: 

121 logger.error(f'Connection check to Hive failed due to an unknown error, {unknown_error}!') 

122 response.error_message = str(unknown_error) 

123 

124 if response.success is True and need_to_close: 

125 self.disconnect() 

126 if response.success is False and self.is_connected is True: 

127 self.is_connected = False 

128 

129 return response 

130 

131 def native_query(self, query: Text) -> Response: 

132 """ 

133 Executes a native SQL query on the Apache Hive server and returns the result. 

134 

135 Args: 

136 query (Text): The SQL query to be executed. 

137 

138 Returns: 

139 Response: A response object containing the result of the query or an error message. 

140 """ 

141 need_to_close = self.is_connected is False 

142 

143 connection = self.connect() 

144 with connection.cursor() as cur: 

145 try: 

146 cur.execute(query) 

147 result = cur.fetchall() 

148 if result: 

149 response = Response( 

150 RESPONSE_TYPE.TABLE, 

151 pd.DataFrame( 

152 result, 

153 columns=[x[0].split('.')[-1] for x in cur.description] 

154 ) 

155 ) 

156 else: 

157 response = Response(RESPONSE_TYPE.OK) 

158 connection.commit() 

159 except OperationalError as operational_error: 

160 logger.error(f'Error running query: {query} on {self.connection_data["database"]}!') 

161 response = Response( 

162 RESPONSE_TYPE.ERROR, 

163 error_message=str(operational_error) 

164 ) 

165 connection.rollback() 

166 except Exception as unknown_error: 

167 logger.error(f'Unknown error running query: {query} on {self.connection_data["database"]}!') 

168 response = Response( 

169 RESPONSE_TYPE.ERROR, 

170 error_message=str(unknown_error) 

171 ) 

172 connection.rollback() 

173 

174 if need_to_close is True: 

175 self.disconnect() 

176 

177 return response 

178 

179 def query(self, query: ASTNode) -> Response: 

180 """ 

181 Executes a SQL query represented by an ASTNode on the Apache Hive server and retrieves the data (if any). 

182 

183 Args: 

184 query (ASTNode): An ASTNode representing the SQL query to be executed. 

185 

186 Returns: 

187 Response: The response from the `native_query` method, containing the result of the SQL query execution. 

188 """ 

189 renderer = SqlalchemyRender(sqlalchemy_hive.HiveDialect) 

190 query_str = renderer.get_string(query, with_failback=True) 

191 return self.native_query(query_str) 

192 

193 def get_tables(self) -> Response: 

194 """ 

195 Retrieves a list of all non-system tables in the Apache Hive server. 

196 

197 Returns: 

198 Response: A response object containing a list of tables in the Apache Hive server. 

199 """ 

200 q = "SHOW TABLES" 

201 result = self.native_query(q) 

202 df = result.data_frame 

203 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'}) 

204 return result 

205 

206 def get_columns(self, table_name: Text) -> Response: 

207 """ 

208 Retrieves column details for a specified table in the Apache Hive server. 

209 

210 Args: 

211 table_name (Text): The name of the table for which to retrieve column information. 

212 

213 Raises: 

214 ValueError: If the 'table_name' is not a valid string. 

215 

216 Returns: 

217 Response: A response object containing the column details. 

218 """ 

219 if not table_name or not isinstance(table_name, str): 

220 raise ValueError("Invalid table name provided.") 

221 

222 q = f"DESCRIBE {table_name}" 

223 result = self.native_query(q) 

224 return result