Coverage for mindsdb / integrations / handlers / phoenix_handler / phoenix_handler.py: 0%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2 

3import pandas as pd 

4import phoenixdb 

5 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

8from mindsdb.integrations.libs.base import DatabaseHandler 

9from pyphoenix.sqlalchemy_phoenix import PhoenixDialect 

10 

11from mindsdb_sql_parser.ast.base import ASTNode 

12 

13from mindsdb.utilities import log 

14from mindsdb.integrations.libs.response import ( 

15 HandlerStatusResponse as StatusResponse, 

16 HandlerResponse as Response, 

17 RESPONSE_TYPE 

18) 

19 

20 

21logger = log.getLogger(__name__) 

22 

23 

24class PhoenixHandler(DatabaseHandler): 

25 """ 

26 This handler handles connection and execution of the Apache Phoenix statements. 

27 """ 

28 

29 name = 'phoenix' 

30 

31 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

32 """ 

33 Initialize the handler. 

34 Args: 

35 name (str): name of particular handler instance 

36 connection_data (dict): parameters for connecting to the database 

37 **kwargs: arbitrary keyword arguments. 

38 """ 

39 super().__init__(name) 

40 self.parser = parse_sql 

41 self.dialect = 'phoenix' 

42 

43 optional_parameters = ['max_retries', 'autocommit', 'auth', 'authentication', 'avatica_user', 'avatica_password', 'user', 'password'] 

44 for parameter in optional_parameters: 

45 if parameter not in connection_data: 

46 connection_data[parameter] = None 

47 

48 self.connection_data = connection_data 

49 self.kwargs = kwargs 

50 

51 self.connection = None 

52 self.is_connected = False 

53 

54 def __del__(self): 

55 if self.is_connected is True: 

56 self.disconnect() 

57 

58 def connect(self) -> StatusResponse: 

59 """ 

60 Set up the connection required by the handler. 

61 Returns: 

62 HandlerStatusResponse 

63 """ 

64 

65 if self.is_connected is True: 

66 return self.connection 

67 

68 self.connection = phoenixdb.connect( 

69 url=self.connection_data['url'], 

70 max_retries=self.connection_data['max_retries'], 

71 autocommit=self.connection_data['autocommit'], 

72 auth=self.connection_data['auth'], 

73 authentication=self.connection_data['authentication'], 

74 avatica_user=self.connection_data['avatica_user'], 

75 avatica_password=self.connection_data['avatica_password'], 

76 user=self.connection_data['user'], 

77 password=self.connection_data['password'] 

78 ) 

79 self.is_connected = True 

80 

81 return self.connection 

82 

83 def disconnect(self): 

84 """ Close any existing connections 

85 

86 Should switch self.is_connected. 

87 """ 

88 

89 if self.is_connected is False: 

90 return 

91 

92 self.connection.close() 

93 self.is_connected = False 

94 return self.is_connected 

95 

96 def check_connection(self) -> StatusResponse: 

97 """ 

98 Check connection to the handler. 

99 Returns: 

100 HandlerStatusResponse 

101 """ 

102 

103 response = StatusResponse(False) 

104 need_to_close = self.is_connected is False 

105 

106 try: 

107 self.connect() 

108 response.success = True 

109 except Exception as e: 

110 logger.error(f'Error connecting to the Phoenix Query Server, {e}!') 

111 response.error_message = str(e) 

112 finally: 

113 if response.success is True and need_to_close: 

114 self.disconnect() 

115 if response.success is False and self.is_connected is True: 

116 self.is_connected = False 

117 

118 return response 

119 

120 def native_query(self, query: str) -> StatusResponse: 

121 """ 

122 Receive raw query and act upon it somehow. 

123 Args: 

124 query (str): query in native format 

125 Returns: 

126 HandlerResponse 

127 """ 

128 

129 need_to_close = self.is_connected is False 

130 

131 connection = self.connect() 

132 cursor = connection.cursor() 

133 

134 try: 

135 cursor.execute(query) 

136 result = cursor.fetchall() 

137 if result: 

138 response = Response( 

139 RESPONSE_TYPE.TABLE, 

140 data_frame=pd.DataFrame( 

141 result, 

142 columns=[x[0] for x in cursor.description] 

143 ) 

144 ) 

145 else: 

146 connection.commit() 

147 response = Response(RESPONSE_TYPE.OK) 

148 except Exception as e: 

149 logger.error(f'Error running query: {query} on the Phoenix Query Server!') 

150 response = Response( 

151 RESPONSE_TYPE.ERROR, 

152 error_message=str(e) 

153 ) 

154 

155 cursor.close() 

156 if need_to_close is True: 

157 self.disconnect() 

158 

159 return response 

160 

161 def query(self, query: ASTNode) -> StatusResponse: 

162 """ 

163 Receive query as AST (abstract syntax tree) and act upon it somehow. 

164 Args: 

165 query (ASTNode): sql query represented as AST. May be any kind 

166 of query: SELECT, INTSERT, DELETE, etc 

167 Returns: 

168 HandlerResponse 

169 """ 

170 

171 renderer = SqlalchemyRender(PhoenixDialect) 

172 query_str = renderer.get_string(query, with_failback=True) 

173 return self.native_query(query_str) 

174 

175 def get_tables(self) -> StatusResponse: 

176 """ 

177 Return list of entities that will be accessible as tables. 

178 Returns: 

179 HandlerResponse 

180 """ 

181 

182 query = """ 

183 SELECT DISTINCT TABLE_NAME, TABLE_SCHEM FROM SYSTEM.CATALOG 

184 """ 

185 result = self.native_query(query) 

186 df = result.data_frame 

187 df = df[df['TABLE_SCHEM'] != 'SYSTEM'] 

188 df = df.drop('TABLE_SCHEM', axis=1) 

189 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'}) 

190 return result 

191 

192 def get_columns(self, table_name: str) -> StatusResponse: 

193 """ 

194 Returns a list of entity columns. 

195 Args: 

196 table_name (str): name of one of tables returned by self.get_tables() 

197 Returns: 

198 HandlerResponse 

199 """ 

200 

201 need_to_close = self.is_connected is False 

202 

203 connection = self.connect() 

204 cursor = connection.cursor() 

205 

206 try: 

207 query = f"SELECT * from {table_name} LIMIT 5" 

208 cursor.execute(query) 

209 cursor.fetchall() 

210 

211 response = Response( 

212 RESPONSE_TYPE.TABLE, 

213 data_frame=pd.DataFrame( 

214 [(x[0], x[1]) for x in cursor.description], 

215 columns=['column_name', 'data_type'] 

216 ) 

217 ) 

218 

219 except Exception as e: 

220 logger.error(f'Error running query: {query} on the Phoenix Query Server!') 

221 response = Response( 

222 RESPONSE_TYPE.ERROR, 

223 error_message=str(e) 

224 ) 

225 

226 cursor.close() 

227 if need_to_close is True: 

228 self.disconnect() 

229 

230 return response