Coverage for mindsdb / integrations / handlers / lindorm_handler / lindorm_handler.py: 0%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2import pandas as pd 

3import phoenixdb 

4from mindsdb_sql_parser import parse_sql 

5from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

6from mindsdb.integrations.libs.base import DatabaseHandler 

7from pyphoenix.sqlalchemy_phoenix import PhoenixDialect 

8from mindsdb_sql_parser.ast.base import ASTNode 

9from mindsdb.utilities import log 

10from mindsdb.integrations.libs.response import ( 

11 HandlerStatusResponse as StatusResponse, 

12 HandlerResponse as Response, 

13 RESPONSE_TYPE 

14) 

15 

16logger = log.getLogger(__name__) 

17 

18 

19class LindormHandler(DatabaseHandler): 

20 """ 

21 This handler handles connection and execution of the Apache Phoenix statements. 

22 """ 

23 

24 name = 'lindorm' 

25 

26 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

27 """ 

28 Initialize the handler. 

29 Args: 

30 name (str): name of particular handler instance 

31 connection_data (dict): parameters for connecting to the database 

32 **kwargs: arbitrary keyword arguments. 

33 """ 

34 super().__init__(name) 

35 self.parser = parse_sql 

36 self.dialect = 'phoenix' 

37 optional_parameters = ['autocommit', 'lindorm_user', 'lindorm_password'] 

38 for parameter in optional_parameters: 

39 if parameter not in connection_data: 

40 connection_data[parameter] = None 

41 

42 self.connection_data = connection_data 

43 self.kwargs = kwargs 

44 

45 self.connection = None 

46 self.is_connected = False 

47 

48 def __del__(self): 

49 if self.is_connected is True: 

50 self.disconnect() 

51 

52 def connect(self) -> StatusResponse: 

53 """ 

54 Set up the connection required by the handler. 

55 Returns: 

56 HandlerStatusResponse 

57 """ 

58 

59 if self.is_connected is True: 

60 return self.connection 

61 

62 lindorm_connection_data = {'lindorm_user': self.connection_data['lindorm_user'], 'lindorm_password': self.connection_data['lindorm_password']} 

63 

64 self.connection = phoenixdb.connect( 

65 url=self.connection_data['url'], 

66 autocommit=self.connection_data['autocommit'], 

67 **lindorm_connection_data 

68 ) 

69 self.is_connected = True 

70 

71 return self.connection 

72 

73 def disconnect(self): 

74 """ Close any existing connections 

75 

76 Should switch self.is_connected. 

77 """ 

78 

79 if self.is_connected is False: 

80 return 

81 

82 self.connection.close() 

83 self.is_connected = False 

84 return self.is_connected 

85 

86 def check_connection(self) -> StatusResponse: 

87 """ 

88 Check connection to the handler. 

89 Returns: 

90 HandlerStatusResponse 

91 """ 

92 

93 response = StatusResponse(False) 

94 need_to_close = self.is_connected is False 

95 

96 try: 

97 self.connect() 

98 response.success = True 

99 except Exception as e: 

100 logger.error(f'Error connecting to the Phoenix Query Server, {e}!') 

101 response.error_message = str(e) 

102 finally: 

103 if response.success is True and need_to_close: 

104 self.disconnect() 

105 if response.success is False and self.is_connected is True: 

106 self.is_connected = False 

107 

108 return response 

109 

110 def native_query(self, query: str) -> StatusResponse: 

111 """ 

112 Receive raw query and act upon it somehow. 

113 Args: 

114 query (str): query in native format 

115 Returns: 

116 HandlerResponse 

117 """ 

118 

119 need_to_close = self.is_connected is False 

120 

121 connection = self.connect() 

122 cursor = connection.cursor() 

123 

124 try: 

125 cursor.execute(query) 

126 result = cursor.fetchall() 

127 if result: 

128 response = Response( 

129 RESPONSE_TYPE.TABLE, 

130 data_frame=pd.DataFrame( 

131 result, 

132 columns=[x[0] for x in cursor.description] 

133 ) 

134 ) 

135 else: 

136 connection.commit() 

137 response = Response(RESPONSE_TYPE.OK) 

138 except Exception as e: 

139 logger.error(f'Error running query: {query} on the Lindorm Query Server!') 

140 response = Response( 

141 RESPONSE_TYPE.ERROR, 

142 error_message=str(e) 

143 ) 

144 

145 cursor.close() 

146 if need_to_close is True: 

147 self.disconnect() 

148 

149 return response 

150 

151 def query(self, query: ASTNode) -> StatusResponse: 

152 """ 

153 Receive query as AST (abstract syntax tree) and act upon it somehow. 

154 Args: 

155 query (ASTNode): sql query represented as AST. May be any kind 

156 of query: SELECT, INTSERT, DELETE, etc 

157 Returns: 

158 HandlerResponse 

159 """ 

160 

161 renderer = SqlalchemyRender(PhoenixDialect) 

162 query_str = renderer.get_string(query, with_failback=True) 

163 return self.native_query(query_str) 

164 

165 def get_tables(self) -> StatusResponse: 

166 """ 

167 Return list of entities that will be accessible as tables. 

168 Returns: 

169 HandlerResponse 

170 """ 

171 

172 query = """ 

173 SELECT DISTINCT TABLE_NAME, TABLE_SCHEM FROM SYSTEM.CATALOG 

174 """ 

175 result = self.native_query(query) 

176 df = result.data_frame 

177 df = df[df['TABLE_SCHEM'] != 'SYSTEM'] 

178 df = df.drop('TABLE_SCHEM', axis=1) 

179 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'}) 

180 return result 

181 

182 def get_columns(self, table_name: str) -> StatusResponse: 

183 """ 

184 Returns a list of entity columns. 

185 Args: 

186 table_name (str): name of one of tables returned by self.get_tables() 

187 Returns: 

188 HandlerResponse 

189 """ 

190 

191 need_to_close = self.is_connected is False 

192 

193 connection = self.connect() 

194 cursor = connection.cursor() 

195 

196 try: 

197 query = f"SELECT * from {table_name} LIMIT 5" 

198 cursor.execute(query) 

199 cursor.fetchall() 

200 

201 response = Response( 

202 RESPONSE_TYPE.TABLE, 

203 data_frame=pd.DataFrame( 

204 [(x[0], x[1]) for x in cursor.description], 

205 columns=['column_name', 'data_type'] 

206 ) 

207 ) 

208 

209 except Exception as e: 

210 logger.error(f'Error running query: {query} on the Phoenix Query Server!') 

211 response = Response( 

212 RESPONSE_TYPE.ERROR, 

213 error_message=str(e) 

214 ) 

215 

216 cursor.close() 

217 if need_to_close is True: 

218 self.disconnect() 

219 

220 return response