Coverage for mindsdb / integrations / handlers / pinot_handler / pinot_handler.py: 0%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2 

3import pandas as pd 

4import pinotdb 

5import requests 

6from requests.exceptions import InvalidSchema 

7import json 

8 

9from mindsdb_sql_parser import parse_sql 

10from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

11from mindsdb.integrations.libs.base import DatabaseHandler 

12from pinotdb.sqlalchemy import PinotDialect 

13 

14from mindsdb_sql_parser.ast.base import ASTNode 

15 

16from mindsdb.utilities import log 

17from mindsdb.integrations.libs.response import ( 

18 HandlerStatusResponse as StatusResponse, 

19 HandlerResponse as Response, 

20 RESPONSE_TYPE 

21) 

22 

23 

24logger = log.getLogger(__name__) 

25 

26 

27class PinotHandler(DatabaseHandler): 

28 """ 

29 This handler handles connection and execution of the Apache Pinot statements. 

30 """ 

31 

32 name = 'pinot' 

33 

34 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

35 """ 

36 Initialize the handler. 

37 Args: 

38 name (str): name of particular handler instance 

39 connection_data (dict): parameters for connecting to the database 

40 **kwargs: arbitrary keyword arguments. 

41 """ 

42 super().__init__(name) 

43 self.parser = parse_sql 

44 self.dialect = 'pinot' 

45 

46 optional_parameters = ['username', 'password'] 

47 for parameter in optional_parameters: 

48 if parameter not in connection_data: 

49 connection_data[parameter] = None 

50 

51 if 'verify_ssl' not in connection_data: 

52 connection_data['verify_ssl'] = 'False' 

53 

54 if 'scheme' not in connection_data: 

55 connection_data['scheme'] = 'http' 

56 

57 self.connection_data = connection_data 

58 self.kwargs = kwargs 

59 

60 self.connection = None 

61 self.is_connected = False 

62 

63 def __del__(self): 

64 if self.is_connected is True: 

65 self.disconnect() 

66 

67 def connect(self) -> StatusResponse: 

68 """ 

69 Set up the connection required by the handler. 

70 Returns: 

71 HandlerStatusResponse 

72 """ 

73 

74 if self.is_connected is True: 

75 return self.connection 

76 

77 self.connection = pinotdb.connect( 

78 host=self.connection_data['host'], 

79 port=self.connection_data['broker_port'], 

80 path=self.connection_data['path'], 

81 scheme=self.connection_data['scheme'], 

82 username=self.connection_data['username'], 

83 password=self.connection_data['password'], 

84 verify_ssl=json.loads(self.connection_data['verify_ssl'].lower()) 

85 ) 

86 self.is_connected = True 

87 

88 return self.connection 

89 

90 def disconnect(self): 

91 """ Close any existing connections 

92 

93 Should switch self.is_connected. 

94 """ 

95 self.is_connected = False 

96 return 

97 

98 def check_connection(self) -> StatusResponse: 

99 """ 

100 Check connection to the handler. 

101 Returns: 

102 HandlerStatusResponse 

103 """ 

104 

105 response = StatusResponse(False) 

106 need_to_close = self.is_connected is False 

107 

108 try: 

109 self.connect() 

110 response.success = True 

111 except Exception as e: 

112 logger.error(f'Error connecting to Pinot, {e}!') 

113 response.error_message = str(e) 

114 finally: 

115 if response.success is True and need_to_close: 

116 self.disconnect() 

117 if response.success is False and self.is_connected is True: 

118 self.is_connected = False 

119 

120 return response 

121 

122 def native_query(self, query: str) -> StatusResponse: 

123 """ 

124 Receive raw query and act upon it somehow. 

125 Args: 

126 query (str): query in native format 

127 Returns: 

128 HandlerResponse 

129 """ 

130 

131 need_to_close = self.is_connected is False 

132 

133 connection = self.connect() 

134 cursor = connection.cursor() 

135 

136 try: 

137 cursor.execute(query) 

138 result = cursor.fetchall() 

139 if result: 

140 response = Response( 

141 RESPONSE_TYPE.TABLE, 

142 data_frame=pd.DataFrame( 

143 result, 

144 columns=[x[0] for x in cursor.description] 

145 ) 

146 ) 

147 else: 

148 connection.commit() 

149 response = Response(RESPONSE_TYPE.OK) 

150 except Exception as e: 

151 logger.error(f'Error running query: {query} on Pinot!') 

152 response = Response( 

153 RESPONSE_TYPE.ERROR, 

154 error_message=str(e) 

155 ) 

156 

157 cursor.close() 

158 if need_to_close is True: 

159 self.disconnect() 

160 

161 return response 

162 

163 def query(self, query: ASTNode) -> StatusResponse: 

164 """ 

165 Receive query as AST (abstract syntax tree) and act upon it somehow. 

166 Args: 

167 query (ASTNode): sql query represented as AST. May be any kind 

168 of query: SELECT, INTSERT, DELETE, etc 

169 Returns: 

170 HandlerResponse 

171 """ 

172 renderer = SqlalchemyRender(PinotDialect) 

173 query_str = renderer.get_string(query, with_failback=True) 

174 return self.native_query(query_str) 

175 

176 def get_tables(self) -> StatusResponse: 

177 """ 

178 Return list of entities that will be accessible as tables. 

179 Returns: 

180 HandlerResponse 

181 """ 

182 

183 api_url = f"{self.connection_data['host']}:{self.connection_data['controller_port']}/tables" 

184 try: 

185 result = requests.get(api_url) 

186 except InvalidSchema: 

187 api_url = f"{self.connection_data['scheme']}://{api_url}" 

188 result = requests.get(api_url) 

189 

190 response = Response( 

191 RESPONSE_TYPE.TABLE, 

192 data_frame=pd.DataFrame( 

193 json.loads(result.content)['tables'], 

194 columns=['table_name'] 

195 ) 

196 ) 

197 

198 return response 

199 

200 def get_columns(self, table_name: str) -> StatusResponse: 

201 """ 

202 Returns a list of entity columns. 

203 Args: 

204 table_name (str): name of one of tables returned by self.get_tables() 

205 Returns: 

206 HandlerResponse 

207 """ 

208 

209 api_url = f"{self.connection_data['host']}:{self.connection_data['controller_port']}/tables/{table_name}/schema" 

210 try: 

211 result = requests.get(api_url) 

212 except InvalidSchema: 

213 api_url = f"{self.connection_data['scheme']}://{api_url}" 

214 result = requests.get(api_url) 

215 

216 df = pd.DataFrame(json.loads(result.content)['dimensionFieldSpecs']) 

217 df = df.rename(columns={'name': 'column_name', 'dataType': 'data_type'}) 

218 

219 response = Response( 

220 RESPONSE_TYPE.TABLE, 

221 data_frame=df 

222 ) 

223 

224 return response