Coverage for mindsdb / integrations / handlers / phoenix_handler / phoenix_handler.py: 0%
100 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
3import pandas as pd
4import phoenixdb
6from mindsdb_sql_parser import parse_sql
7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
8from mindsdb.integrations.libs.base import DatabaseHandler
9from pyphoenix.sqlalchemy_phoenix import PhoenixDialect
11from mindsdb_sql_parser.ast.base import ASTNode
13from mindsdb.utilities import log
14from mindsdb.integrations.libs.response import (
15 HandlerStatusResponse as StatusResponse,
16 HandlerResponse as Response,
17 RESPONSE_TYPE
18)
21logger = log.getLogger(__name__)
24class PhoenixHandler(DatabaseHandler):
25 """
26 This handler handles connection and execution of the Apache Phoenix statements.
27 """
29 name = 'phoenix'
31 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
32 """
33 Initialize the handler.
34 Args:
35 name (str): name of particular handler instance
36 connection_data (dict): parameters for connecting to the database
37 **kwargs: arbitrary keyword arguments.
38 """
39 super().__init__(name)
40 self.parser = parse_sql
41 self.dialect = 'phoenix'
43 optional_parameters = ['max_retries', 'autocommit', 'auth', 'authentication', 'avatica_user', 'avatica_password', 'user', 'password']
44 for parameter in optional_parameters:
45 if parameter not in connection_data:
46 connection_data[parameter] = None
48 self.connection_data = connection_data
49 self.kwargs = kwargs
51 self.connection = None
52 self.is_connected = False
54 def __del__(self):
55 if self.is_connected is True:
56 self.disconnect()
58 def connect(self) -> StatusResponse:
59 """
60 Set up the connection required by the handler.
61 Returns:
62 HandlerStatusResponse
63 """
65 if self.is_connected is True:
66 return self.connection
68 self.connection = phoenixdb.connect(
69 url=self.connection_data['url'],
70 max_retries=self.connection_data['max_retries'],
71 autocommit=self.connection_data['autocommit'],
72 auth=self.connection_data['auth'],
73 authentication=self.connection_data['authentication'],
74 avatica_user=self.connection_data['avatica_user'],
75 avatica_password=self.connection_data['avatica_password'],
76 user=self.connection_data['user'],
77 password=self.connection_data['password']
78 )
79 self.is_connected = True
81 return self.connection
83 def disconnect(self):
84 """ Close any existing connections
86 Should switch self.is_connected.
87 """
89 if self.is_connected is False:
90 return
92 self.connection.close()
93 self.is_connected = False
94 return self.is_connected
96 def check_connection(self) -> StatusResponse:
97 """
98 Check connection to the handler.
99 Returns:
100 HandlerStatusResponse
101 """
103 response = StatusResponse(False)
104 need_to_close = self.is_connected is False
106 try:
107 self.connect()
108 response.success = True
109 except Exception as e:
110 logger.error(f'Error connecting to the Phoenix Query Server, {e}!')
111 response.error_message = str(e)
112 finally:
113 if response.success is True and need_to_close:
114 self.disconnect()
115 if response.success is False and self.is_connected is True:
116 self.is_connected = False
118 return response
120 def native_query(self, query: str) -> StatusResponse:
121 """
122 Receive raw query and act upon it somehow.
123 Args:
124 query (str): query in native format
125 Returns:
126 HandlerResponse
127 """
129 need_to_close = self.is_connected is False
131 connection = self.connect()
132 cursor = connection.cursor()
134 try:
135 cursor.execute(query)
136 result = cursor.fetchall()
137 if result:
138 response = Response(
139 RESPONSE_TYPE.TABLE,
140 data_frame=pd.DataFrame(
141 result,
142 columns=[x[0] for x in cursor.description]
143 )
144 )
145 else:
146 connection.commit()
147 response = Response(RESPONSE_TYPE.OK)
148 except Exception as e:
149 logger.error(f'Error running query: {query} on the Phoenix Query Server!')
150 response = Response(
151 RESPONSE_TYPE.ERROR,
152 error_message=str(e)
153 )
155 cursor.close()
156 if need_to_close is True:
157 self.disconnect()
159 return response
161 def query(self, query: ASTNode) -> StatusResponse:
162 """
163 Receive query as AST (abstract syntax tree) and act upon it somehow.
164 Args:
165 query (ASTNode): sql query represented as AST. May be any kind
166 of query: SELECT, INTSERT, DELETE, etc
167 Returns:
168 HandlerResponse
169 """
171 renderer = SqlalchemyRender(PhoenixDialect)
172 query_str = renderer.get_string(query, with_failback=True)
173 return self.native_query(query_str)
175 def get_tables(self) -> StatusResponse:
176 """
177 Return list of entities that will be accessible as tables.
178 Returns:
179 HandlerResponse
180 """
182 query = """
183 SELECT DISTINCT TABLE_NAME, TABLE_SCHEM FROM SYSTEM.CATALOG
184 """
185 result = self.native_query(query)
186 df = result.data_frame
187 df = df[df['TABLE_SCHEM'] != 'SYSTEM']
188 df = df.drop('TABLE_SCHEM', axis=1)
189 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'})
190 return result
192 def get_columns(self, table_name: str) -> StatusResponse:
193 """
194 Returns a list of entity columns.
195 Args:
196 table_name (str): name of one of tables returned by self.get_tables()
197 Returns:
198 HandlerResponse
199 """
201 need_to_close = self.is_connected is False
203 connection = self.connect()
204 cursor = connection.cursor()
206 try:
207 query = f"SELECT * from {table_name} LIMIT 5"
208 cursor.execute(query)
209 cursor.fetchall()
211 response = Response(
212 RESPONSE_TYPE.TABLE,
213 data_frame=pd.DataFrame(
214 [(x[0], x[1]) for x in cursor.description],
215 columns=['column_name', 'data_type']
216 )
217 )
219 except Exception as e:
220 logger.error(f'Error running query: {query} on the Phoenix Query Server!')
221 response = Response(
222 RESPONSE_TYPE.ERROR,
223 error_message=str(e)
224 )
226 cursor.close()
227 if need_to_close is True:
228 self.disconnect()
230 return response