Coverage for mindsdb / integrations / handlers / hive_handler / hive_handler.py: 0%
104 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Text, Dict, Optional
3from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
4from mindsdb_sql_parser.ast.base import ASTNode
5import pandas as pd
6from pyhive import (hive, sqlalchemy_hive)
7from pyhive.exc import OperationalError
8from thrift.transport.TTransport import TTransportException
10from mindsdb.integrations.libs.base import DatabaseHandler
11from mindsdb.integrations.libs.response import (
12 HandlerStatusResponse as StatusResponse,
13 HandlerResponse as Response,
14 RESPONSE_TYPE
15)
16from mindsdb.utilities import log
19logger = log.getLogger(__name__)
22class HiveHandler(DatabaseHandler):
23 """
24 This handler handles the connection and execution of SQL statements on Apache Hive.
25 """
27 name = 'hive'
29 def __init__(self, name: Text, connection_data: Optional[Dict], **kwargs) -> None:
30 """
31 Initializes the handler.
33 Args:
34 name (Text): The name of the handler instance.
35 connection_data (Dict): The connection data required to connect to the Apache Hive server.
36 kwargs: Arbitrary keyword arguments.
37 """
38 super().__init__(name)
39 self.connection_data = connection_data
40 self.kwargs = kwargs
42 self.connection = None
43 self.is_connected = False
45 def __del__(self) -> None:
46 """
47 Closes the connection when the handler instance is deleted.
48 """
49 if self.is_connected:
50 self.disconnect()
52 def connect(self) -> hive.Connection:
53 """
54 Establishes a connection to the Apache Hive server.
56 Raises:
57 ValueError: If the expected connection parameters are not provided.
59 Returns:
60 hive.Connection: A connection object to the Apache Hive server.
61 """
62 if self.is_connected:
63 return self.connection
65 # Mandatory connection parameters.
66 if not all(key in self.connection_data for key in ['host', 'database']):
67 raise ValueError('Required parameters (account, database) must be provided.')
69 config = {
70 'host': self.connection_data.get('host'),
71 'database': self.connection_data.get('database')
72 }
74 # Optional connection parameters.
75 optional_parameters = ['port', 'username', 'password']
76 for param in optional_parameters:
77 if param in self.connection_data:
78 config[param] = self.connection_data[param]
80 config['auth'] = self.connection_data.get('auth', 'CUSTOM').upper()
82 try:
83 self.connection = hive.Connection(**config)
84 self.is_connected = True
85 return self.connection
86 except (OperationalError, TTransportException, ValueError) as known_error:
87 logger.error(f'Error connecting to Hive {config["database"]}, {known_error}!')
88 raise
89 except Exception as unknown_error:
90 logger.error(f'Unknown error connecting to Hive {config["database"]}, {unknown_error}!')
91 raise
93 def disconnect(self) -> None:
94 """
95 Closes the connection to the Apache Hive server if it's currently open.
96 """
97 if self.is_connected is False:
98 return
100 self.connection.close()
101 self.is_connected = False
102 return
104 def check_connection(self) -> StatusResponse:
105 """
106 Checks the status of the connection to the Apache Hive server.
108 Returns:
109 StatusResponse: An object containing the success status and an error message if an error occurs.
110 """
111 response = StatusResponse(False)
112 need_to_close = self.is_connected is False
114 try:
115 self.connect()
116 response.success = True
117 except (OperationalError, TTransportException, ValueError) as known_error:
118 logger.error(f'Connection check to Hive failed, {known_error}!')
119 response.error_message = str(known_error)
120 except Exception as unknown_error:
121 logger.error(f'Connection check to Hive failed due to an unknown error, {unknown_error}!')
122 response.error_message = str(unknown_error)
124 if response.success is True and need_to_close:
125 self.disconnect()
126 if response.success is False and self.is_connected is True:
127 self.is_connected = False
129 return response
131 def native_query(self, query: Text) -> Response:
132 """
133 Executes a native SQL query on the Apache Hive server and returns the result.
135 Args:
136 query (Text): The SQL query to be executed.
138 Returns:
139 Response: A response object containing the result of the query or an error message.
140 """
141 need_to_close = self.is_connected is False
143 connection = self.connect()
144 with connection.cursor() as cur:
145 try:
146 cur.execute(query)
147 result = cur.fetchall()
148 if result:
149 response = Response(
150 RESPONSE_TYPE.TABLE,
151 pd.DataFrame(
152 result,
153 columns=[x[0].split('.')[-1] for x in cur.description]
154 )
155 )
156 else:
157 response = Response(RESPONSE_TYPE.OK)
158 connection.commit()
159 except OperationalError as operational_error:
160 logger.error(f'Error running query: {query} on {self.connection_data["database"]}!')
161 response = Response(
162 RESPONSE_TYPE.ERROR,
163 error_message=str(operational_error)
164 )
165 connection.rollback()
166 except Exception as unknown_error:
167 logger.error(f'Unknown error running query: {query} on {self.connection_data["database"]}!')
168 response = Response(
169 RESPONSE_TYPE.ERROR,
170 error_message=str(unknown_error)
171 )
172 connection.rollback()
174 if need_to_close is True:
175 self.disconnect()
177 return response
179 def query(self, query: ASTNode) -> Response:
180 """
181 Executes a SQL query represented by an ASTNode on the Apache Hive server and retrieves the data (if any).
183 Args:
184 query (ASTNode): An ASTNode representing the SQL query to be executed.
186 Returns:
187 Response: The response from the `native_query` method, containing the result of the SQL query execution.
188 """
189 renderer = SqlalchemyRender(sqlalchemy_hive.HiveDialect)
190 query_str = renderer.get_string(query, with_failback=True)
191 return self.native_query(query_str)
193 def get_tables(self) -> Response:
194 """
195 Retrieves a list of all non-system tables in the Apache Hive server.
197 Returns:
198 Response: A response object containing a list of tables in the Apache Hive server.
199 """
200 q = "SHOW TABLES"
201 result = self.native_query(q)
202 df = result.data_frame
203 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'})
204 return result
206 def get_columns(self, table_name: Text) -> Response:
207 """
208 Retrieves column details for a specified table in the Apache Hive server.
210 Args:
211 table_name (Text): The name of the table for which to retrieve column information.
213 Raises:
214 ValueError: If the 'table_name' is not a valid string.
216 Returns:
217 Response: A response object containing the column details.
218 """
219 if not table_name or not isinstance(table_name, str):
220 raise ValueError("Invalid table name provided.")
222 q = f"DESCRIBE {table_name}"
223 result = self.native_query(q)
224 return result