Coverage for mindsdb / integrations / handlers / impala_handler / impala_handler.py: 0%

81 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2 

3import pandas as pd 

4from impala import dbapi as db, sqlalchemy as SA 

5 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

8from mindsdb_sql_parser.ast.base import ASTNode 

9 

10from mindsdb.utilities import log 

11from mindsdb.integrations.libs.base import DatabaseHandler 

12from mindsdb.integrations.libs.response import ( 

13 HandlerStatusResponse as StatusResponse, 

14 HandlerResponse as Response, 

15 RESPONSE_TYPE 

16) 

17 

18 

19logger = log.getLogger(__name__) 

20 

21 

22class ImpalaHandler(DatabaseHandler): 

23 """ 

24 This handler handles connection and execution of the Impala statements. 

25 """ 

26 

27 name = 'impala' 

28 

29 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

30 super().__init__(name) 

31 

32 self.parser = parse_sql 

33 self.dialect = 'impala' 

34 self.kwargs = kwargs 

35 self.connection_data = connection_data 

36 

37 self.connection = None 

38 self.is_connected = False 

39 

40 def connect(self): 

41 if self.is_connected is True: 

42 return self.connection 

43 

44 config = { 

45 'host': self.connection_data.get('host'), 

46 'port': self.connection_data.get('port', 21050), 

47 'user': self.connection_data.get('user'), 

48 'password': self.connection_data.get('password'), 

49 'database': self.connection_data.get('database'), 

50 

51 

52 } 

53 

54 connection = db.connect(**config) 

55 self.is_connected = True 

56 self.connection = connection 

57 return self.connection 

58 

59 def disconnect(self): 

60 if self.is_connected is False: 

61 return 

62 self.connection.close() 

63 self.is_connected = False 

64 return 

65 

66 def check_connection(self) -> StatusResponse: 

67 

68 result = StatusResponse(False) 

69 need_to_close = self.is_connected is False 

70 

71 try: 

72 connection = self.connect() 

73 result.success = connection is not None 

74 except Exception as e: 

75 logger.error(f'x x x Error connecting to Impala {self.connection_data["database"]}, {e}!') 

76 result.error_message = str(e) 

77 

78 if result.success is True and need_to_close: 

79 self.disconnect() 

80 if result.success is False and self.is_connected is True: 

81 self.is_connected = False 

82 

83 return result 

84 

85 def native_query(self, query: str) -> Response: 

86 """ 

87 Receive SQL query and runs it 

88 :param query: The SQL query to run in Impala 

89 :return: returns the records from the current recordset 

90 """ 

91 

92 need_to_close = self.is_connected is False 

93 

94 connection = self.connect() 

95 with connection.cursor() as cur: 

96 try: 

97 cur.execute(query) 

98 result = cur.fetchall() 

99 if cur.has_result_set: 

100 

101 response = Response( 

102 RESPONSE_TYPE.TABLE, 

103 pd.DataFrame( 

104 result, 

105 columns=[x[0] for x in cur.description] 

106 ) 

107 ) 

108 else: 

109 response = Response(RESPONSE_TYPE.OK) 

110 connection.commit() 

111 except Exception as e: 

112 logger.error(f'Error running query: {query} on {self.connection_data["database"]}!') 

113 response = Response( 

114 RESPONSE_TYPE.ERROR, 

115 error_message=str(e) 

116 ) 

117 # connection.rollback() 

118 

119 if need_to_close is True: 

120 self.disconnect() 

121 

122 return response 

123 

124 def query(self, query: ASTNode) -> Response: 

125 """ 

126 Retrieve the data from the SQL statement. 

127 """ 

128 renderer = SqlalchemyRender(SA.ImpalaDialect) 

129 query_str = renderer.get_string(query, with_failback=True) 

130 return self.native_query(query_str) 

131 

132 def get_tables(self) -> Response: 

133 """ 

134 Get a list with all of the tabels in Impala 

135 """ 

136 q = "SHOW TABLES;" 

137 result = self.native_query(q) 

138 df = result.data_frame.rename(columns={'name': 'TABLE_NAME'}) 

139 result.data_frame = df 

140 

141 return result 

142 

143 def get_columns(self, table_name: str) -> Response: 

144 """ 

145 Show details about the table 

146 """ 

147 q = f"DESCRIBE {table_name};" 

148 

149 result = self.native_query(q) 

150 df = result.data_frame.iloc[:, 0:2].rename(columns={'name': 'COLUMN_NAME', 'type': 'Data_Type'}) 

151 result.data_frame = df 

152 

153 return result