Coverage for mindsdb / integrations / handlers / pinot_handler / pinot_handler.py: 0%
100 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
3import pandas as pd
4import pinotdb
5import requests
6from requests.exceptions import InvalidSchema
7import json
9from mindsdb_sql_parser import parse_sql
10from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
11from mindsdb.integrations.libs.base import DatabaseHandler
12from pinotdb.sqlalchemy import PinotDialect
14from mindsdb_sql_parser.ast.base import ASTNode
16from mindsdb.utilities import log
17from mindsdb.integrations.libs.response import (
18 HandlerStatusResponse as StatusResponse,
19 HandlerResponse as Response,
20 RESPONSE_TYPE
21)
24logger = log.getLogger(__name__)
27class PinotHandler(DatabaseHandler):
28 """
29 This handler handles connection and execution of the Apache Pinot statements.
30 """
32 name = 'pinot'
34 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
35 """
36 Initialize the handler.
37 Args:
38 name (str): name of particular handler instance
39 connection_data (dict): parameters for connecting to the database
40 **kwargs: arbitrary keyword arguments.
41 """
42 super().__init__(name)
43 self.parser = parse_sql
44 self.dialect = 'pinot'
46 optional_parameters = ['username', 'password']
47 for parameter in optional_parameters:
48 if parameter not in connection_data:
49 connection_data[parameter] = None
51 if 'verify_ssl' not in connection_data:
52 connection_data['verify_ssl'] = 'False'
54 if 'scheme' not in connection_data:
55 connection_data['scheme'] = 'http'
57 self.connection_data = connection_data
58 self.kwargs = kwargs
60 self.connection = None
61 self.is_connected = False
63 def __del__(self):
64 if self.is_connected is True:
65 self.disconnect()
67 def connect(self) -> StatusResponse:
68 """
69 Set up the connection required by the handler.
70 Returns:
71 HandlerStatusResponse
72 """
74 if self.is_connected is True:
75 return self.connection
77 self.connection = pinotdb.connect(
78 host=self.connection_data['host'],
79 port=self.connection_data['broker_port'],
80 path=self.connection_data['path'],
81 scheme=self.connection_data['scheme'],
82 username=self.connection_data['username'],
83 password=self.connection_data['password'],
84 verify_ssl=json.loads(self.connection_data['verify_ssl'].lower())
85 )
86 self.is_connected = True
88 return self.connection
90 def disconnect(self):
91 """ Close any existing connections
93 Should switch self.is_connected.
94 """
95 self.is_connected = False
96 return
98 def check_connection(self) -> StatusResponse:
99 """
100 Check connection to the handler.
101 Returns:
102 HandlerStatusResponse
103 """
105 response = StatusResponse(False)
106 need_to_close = self.is_connected is False
108 try:
109 self.connect()
110 response.success = True
111 except Exception as e:
112 logger.error(f'Error connecting to Pinot, {e}!')
113 response.error_message = str(e)
114 finally:
115 if response.success is True and need_to_close:
116 self.disconnect()
117 if response.success is False and self.is_connected is True:
118 self.is_connected = False
120 return response
122 def native_query(self, query: str) -> StatusResponse:
123 """
124 Receive raw query and act upon it somehow.
125 Args:
126 query (str): query in native format
127 Returns:
128 HandlerResponse
129 """
131 need_to_close = self.is_connected is False
133 connection = self.connect()
134 cursor = connection.cursor()
136 try:
137 cursor.execute(query)
138 result = cursor.fetchall()
139 if result:
140 response = Response(
141 RESPONSE_TYPE.TABLE,
142 data_frame=pd.DataFrame(
143 result,
144 columns=[x[0] for x in cursor.description]
145 )
146 )
147 else:
148 connection.commit()
149 response = Response(RESPONSE_TYPE.OK)
150 except Exception as e:
151 logger.error(f'Error running query: {query} on Pinot!')
152 response = Response(
153 RESPONSE_TYPE.ERROR,
154 error_message=str(e)
155 )
157 cursor.close()
158 if need_to_close is True:
159 self.disconnect()
161 return response
163 def query(self, query: ASTNode) -> StatusResponse:
164 """
165 Receive query as AST (abstract syntax tree) and act upon it somehow.
166 Args:
167 query (ASTNode): sql query represented as AST. May be any kind
168 of query: SELECT, INTSERT, DELETE, etc
169 Returns:
170 HandlerResponse
171 """
172 renderer = SqlalchemyRender(PinotDialect)
173 query_str = renderer.get_string(query, with_failback=True)
174 return self.native_query(query_str)
176 def get_tables(self) -> StatusResponse:
177 """
178 Return list of entities that will be accessible as tables.
179 Returns:
180 HandlerResponse
181 """
183 api_url = f"{self.connection_data['host']}:{self.connection_data['controller_port']}/tables"
184 try:
185 result = requests.get(api_url)
186 except InvalidSchema:
187 api_url = f"{self.connection_data['scheme']}://{api_url}"
188 result = requests.get(api_url)
190 response = Response(
191 RESPONSE_TYPE.TABLE,
192 data_frame=pd.DataFrame(
193 json.loads(result.content)['tables'],
194 columns=['table_name']
195 )
196 )
198 return response
200 def get_columns(self, table_name: str) -> StatusResponse:
201 """
202 Returns a list of entity columns.
203 Args:
204 table_name (str): name of one of tables returned by self.get_tables()
205 Returns:
206 HandlerResponse
207 """
209 api_url = f"{self.connection_data['host']}:{self.connection_data['controller_port']}/tables/{table_name}/schema"
210 try:
211 result = requests.get(api_url)
212 except InvalidSchema:
213 api_url = f"{self.connection_data['scheme']}://{api_url}"
214 result = requests.get(api_url)
216 df = pd.DataFrame(json.loads(result.content)['dimensionFieldSpecs'])
217 df = df.rename(columns={'name': 'column_name', 'dataType': 'data_type'})
219 response = Response(
220 RESPONSE_TYPE.TABLE,
221 data_frame=df
222 )
224 return response