Coverage for mindsdb / integrations / handlers / impala_handler / impala_handler.py: 0%
81 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
3import pandas as pd
4from impala import dbapi as db, sqlalchemy as SA
6from mindsdb_sql_parser import parse_sql
7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
8from mindsdb_sql_parser.ast.base import ASTNode
10from mindsdb.utilities import log
11from mindsdb.integrations.libs.base import DatabaseHandler
12from mindsdb.integrations.libs.response import (
13 HandlerStatusResponse as StatusResponse,
14 HandlerResponse as Response,
15 RESPONSE_TYPE
16)
19logger = log.getLogger(__name__)
22class ImpalaHandler(DatabaseHandler):
23 """
24 This handler handles connection and execution of the Impala statements.
25 """
27 name = 'impala'
29 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
30 super().__init__(name)
32 self.parser = parse_sql
33 self.dialect = 'impala'
34 self.kwargs = kwargs
35 self.connection_data = connection_data
37 self.connection = None
38 self.is_connected = False
40 def connect(self):
41 if self.is_connected is True:
42 return self.connection
44 config = {
45 'host': self.connection_data.get('host'),
46 'port': self.connection_data.get('port', 21050),
47 'user': self.connection_data.get('user'),
48 'password': self.connection_data.get('password'),
49 'database': self.connection_data.get('database'),
52 }
54 connection = db.connect(**config)
55 self.is_connected = True
56 self.connection = connection
57 return self.connection
59 def disconnect(self):
60 if self.is_connected is False:
61 return
62 self.connection.close()
63 self.is_connected = False
64 return
66 def check_connection(self) -> StatusResponse:
68 result = StatusResponse(False)
69 need_to_close = self.is_connected is False
71 try:
72 connection = self.connect()
73 result.success = connection is not None
74 except Exception as e:
75 logger.error(f'x x x Error connecting to Impala {self.connection_data["database"]}, {e}!')
76 result.error_message = str(e)
78 if result.success is True and need_to_close:
79 self.disconnect()
80 if result.success is False and self.is_connected is True:
81 self.is_connected = False
83 return result
85 def native_query(self, query: str) -> Response:
86 """
87 Receive SQL query and runs it
88 :param query: The SQL query to run in Impala
89 :return: returns the records from the current recordset
90 """
92 need_to_close = self.is_connected is False
94 connection = self.connect()
95 with connection.cursor() as cur:
96 try:
97 cur.execute(query)
98 result = cur.fetchall()
99 if cur.has_result_set:
101 response = Response(
102 RESPONSE_TYPE.TABLE,
103 pd.DataFrame(
104 result,
105 columns=[x[0] for x in cur.description]
106 )
107 )
108 else:
109 response = Response(RESPONSE_TYPE.OK)
110 connection.commit()
111 except Exception as e:
112 logger.error(f'Error running query: {query} on {self.connection_data["database"]}!')
113 response = Response(
114 RESPONSE_TYPE.ERROR,
115 error_message=str(e)
116 )
117 # connection.rollback()
119 if need_to_close is True:
120 self.disconnect()
122 return response
124 def query(self, query: ASTNode) -> Response:
125 """
126 Retrieve the data from the SQL statement.
127 """
128 renderer = SqlalchemyRender(SA.ImpalaDialect)
129 query_str = renderer.get_string(query, with_failback=True)
130 return self.native_query(query_str)
132 def get_tables(self) -> Response:
133 """
134 Get a list with all of the tabels in Impala
135 """
136 q = "SHOW TABLES;"
137 result = self.native_query(q)
138 df = result.data_frame.rename(columns={'name': 'TABLE_NAME'})
139 result.data_frame = df
141 return result
143 def get_columns(self, table_name: str) -> Response:
144 """
145 Show details about the table
146 """
147 q = f"DESCRIBE {table_name};"
149 result = self.native_query(q)
150 df = result.data_frame.iloc[:, 0:2].rename(columns={'name': 'COLUMN_NAME', 'type': 'Data_Type'})
151 result.data_frame = df
153 return result