Coverage for mindsdb / integrations / handlers / lindorm_handler / lindorm_handler.py: 0%
101 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
2import pandas as pd
3import phoenixdb
4from mindsdb_sql_parser import parse_sql
5from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
6from mindsdb.integrations.libs.base import DatabaseHandler
7from pyphoenix.sqlalchemy_phoenix import PhoenixDialect
8from mindsdb_sql_parser.ast.base import ASTNode
9from mindsdb.utilities import log
10from mindsdb.integrations.libs.response import (
11 HandlerStatusResponse as StatusResponse,
12 HandlerResponse as Response,
13 RESPONSE_TYPE
14)
16logger = log.getLogger(__name__)
19class LindormHandler(DatabaseHandler):
20 """
21 This handler handles connection and execution of the Apache Phoenix statements.
22 """
24 name = 'lindorm'
26 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
27 """
28 Initialize the handler.
29 Args:
30 name (str): name of particular handler instance
31 connection_data (dict): parameters for connecting to the database
32 **kwargs: arbitrary keyword arguments.
33 """
34 super().__init__(name)
35 self.parser = parse_sql
36 self.dialect = 'phoenix'
37 optional_parameters = ['autocommit', 'lindorm_user', 'lindorm_password']
38 for parameter in optional_parameters:
39 if parameter not in connection_data:
40 connection_data[parameter] = None
42 self.connection_data = connection_data
43 self.kwargs = kwargs
45 self.connection = None
46 self.is_connected = False
48 def __del__(self):
49 if self.is_connected is True:
50 self.disconnect()
52 def connect(self) -> StatusResponse:
53 """
54 Set up the connection required by the handler.
55 Returns:
56 HandlerStatusResponse
57 """
59 if self.is_connected is True:
60 return self.connection
62 lindorm_connection_data = {'lindorm_user': self.connection_data['lindorm_user'], 'lindorm_password': self.connection_data['lindorm_password']}
64 self.connection = phoenixdb.connect(
65 url=self.connection_data['url'],
66 autocommit=self.connection_data['autocommit'],
67 **lindorm_connection_data
68 )
69 self.is_connected = True
71 return self.connection
73 def disconnect(self):
74 """ Close any existing connections
76 Should switch self.is_connected.
77 """
79 if self.is_connected is False:
80 return
82 self.connection.close()
83 self.is_connected = False
84 return self.is_connected
86 def check_connection(self) -> StatusResponse:
87 """
88 Check connection to the handler.
89 Returns:
90 HandlerStatusResponse
91 """
93 response = StatusResponse(False)
94 need_to_close = self.is_connected is False
96 try:
97 self.connect()
98 response.success = True
99 except Exception as e:
100 logger.error(f'Error connecting to the Phoenix Query Server, {e}!')
101 response.error_message = str(e)
102 finally:
103 if response.success is True and need_to_close:
104 self.disconnect()
105 if response.success is False and self.is_connected is True:
106 self.is_connected = False
108 return response
110 def native_query(self, query: str) -> StatusResponse:
111 """
112 Receive raw query and act upon it somehow.
113 Args:
114 query (str): query in native format
115 Returns:
116 HandlerResponse
117 """
119 need_to_close = self.is_connected is False
121 connection = self.connect()
122 cursor = connection.cursor()
124 try:
125 cursor.execute(query)
126 result = cursor.fetchall()
127 if result:
128 response = Response(
129 RESPONSE_TYPE.TABLE,
130 data_frame=pd.DataFrame(
131 result,
132 columns=[x[0] for x in cursor.description]
133 )
134 )
135 else:
136 connection.commit()
137 response = Response(RESPONSE_TYPE.OK)
138 except Exception as e:
139 logger.error(f'Error running query: {query} on the Lindorm Query Server!')
140 response = Response(
141 RESPONSE_TYPE.ERROR,
142 error_message=str(e)
143 )
145 cursor.close()
146 if need_to_close is True:
147 self.disconnect()
149 return response
151 def query(self, query: ASTNode) -> StatusResponse:
152 """
153 Receive query as AST (abstract syntax tree) and act upon it somehow.
154 Args:
155 query (ASTNode): sql query represented as AST. May be any kind
156 of query: SELECT, INTSERT, DELETE, etc
157 Returns:
158 HandlerResponse
159 """
161 renderer = SqlalchemyRender(PhoenixDialect)
162 query_str = renderer.get_string(query, with_failback=True)
163 return self.native_query(query_str)
165 def get_tables(self) -> StatusResponse:
166 """
167 Return list of entities that will be accessible as tables.
168 Returns:
169 HandlerResponse
170 """
172 query = """
173 SELECT DISTINCT TABLE_NAME, TABLE_SCHEM FROM SYSTEM.CATALOG
174 """
175 result = self.native_query(query)
176 df = result.data_frame
177 df = df[df['TABLE_SCHEM'] != 'SYSTEM']
178 df = df.drop('TABLE_SCHEM', axis=1)
179 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'})
180 return result
182 def get_columns(self, table_name: str) -> StatusResponse:
183 """
184 Returns a list of entity columns.
185 Args:
186 table_name (str): name of one of tables returned by self.get_tables()
187 Returns:
188 HandlerResponse
189 """
191 need_to_close = self.is_connected is False
193 connection = self.connect()
194 cursor = connection.cursor()
196 try:
197 query = f"SELECT * from {table_name} LIMIT 5"
198 cursor.execute(query)
199 cursor.fetchall()
201 response = Response(
202 RESPONSE_TYPE.TABLE,
203 data_frame=pd.DataFrame(
204 [(x[0], x[1]) for x in cursor.description],
205 columns=['column_name', 'data_type']
206 )
207 )
209 except Exception as e:
210 logger.error(f'Error running query: {query} on the Phoenix Query Server!')
211 response = Response(
212 RESPONSE_TYPE.ERROR,
213 error_message=str(e)
214 )
216 cursor.close()
217 if need_to_close is True:
218 self.disconnect()
220 return response