Coverage for mindsdb / integrations / handlers / aerospike_handler / aerospike_handler.py: 0%

135 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import re 

2from typing import Optional 

3 

4import duckdb 

5import aerospike 

6import pandas as pd 

7# from sqlalchemy import create_engine 

8 

9from mindsdb_sql_parser import parse_sql 

10from mindsdb_sql_parser.ast.base import ASTNode 

11 

12# from mindsdb.utilities import log 

13from mindsdb.integrations.libs.base import DatabaseHandler 

14from mindsdb.integrations.libs.response import ( 

15 HandlerStatusResponse as StatusResponse, 

16 HandlerResponse as Response, 

17 RESPONSE_TYPE 

18) 

19 

20 

21class AerospikeHandler(DatabaseHandler): 

22 """ 

23 This handler handles connection and execution of the Solr SQL statements. 

24 """ 

25 name = 'aerospike' 

26 

27 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

28 super().__init__(name) 

29 self.parser = parse_sql 

30 self.dialect = 'aerospike' 

31 self.connection_data = connection_data 

32 self.kwargs = kwargs 

33 if not self.connection_data.get('host'): 

34 raise Exception("The host parameter should be provided!") 

35 if not self.connection_data.get('port'): 

36 raise Exception("The port parameter should be provided!") 

37 

38 self.connection = None 

39 self.is_connected = False 

40 

41 def __del__(self): 

42 if self.is_connected is True: 

43 self.disconnect() 

44 

45 def connect(self): 

46 """ 

47 Set up the connection required by the handler. 

48 Returns: 

49 HandlerStatusResponse 

50 """ 

51 if self.is_connected is True: 

52 return self.connection 

53 

54 user = self.connection_data.get('user', None) 

55 password = self.connection_data.get('password', None) 

56 config = { 

57 'user': user, 

58 'password': password, 

59 'hosts': [(self.connection_data.get('host'), self.connection_data.get('port'))], 

60 } 

61 connection = aerospike.client(config).connect() 

62 self.is_connected = True 

63 self.connection = connection.connect() 

64 return self.connection 

65 

66 def disconnect(self): 

67 """ 

68 Close any existing connections. 

69 """ 

70 if self.is_connected is False: 

71 return 

72 self.connection.close() 

73 self.is_connected = False 

74 return 

75 

76 def check_connection(self) -> StatusResponse: 

77 """ 

78 Check the connection of the Aerospike database 

79 Returns: 

80 HandlerStatusResponse 

81 """ 

82 

83 response = StatusResponse(False) 

84 need_to_close = self.is_connected is False 

85 

86 try: 

87 self.connect() 

88 response.success = True 

89 except Exception as e: 

90 response.error_message = str(e) 

91 

92 if response.success is True and need_to_close: 

93 self.disconnect() 

94 if response.success is False and self.is_connected is True: 

95 self.is_connected = False 

96 return response 

97 

98 def native_query(self, query: str) -> Response: 

99 """ 

100 Receive raw query and act upon it somehow. 

101 Args: 

102 query (str): query in native format 

103 Returns: 

104 HandlerResponse 

105 """ 

106 

107 need_to_close = self.is_connected is False 

108 

109 connection = self.connect() 

110 

111 try: 

112 # where is not supported 

113 selected_bins, aero_ns, aero_set = self.parse_aql_query(query) 

114 aero_ns = aero_ns.lower() 

115 aero_set = aero_set.lower() 

116 scan = connection.scan(aero_ns.lower(), aero_set.lower()) 

117 res = scan.results() 

118 data_df = pd.DataFrame.from_records([r[2] for r in res]) 

119 if ' where ' in query or ' WHERE ' in query or '*' not in selected_bins: 

120 new_query = re.sub(r'FROM [\w\.]+', 'FROM ' + 'data_df', query, 1) 

121 new_query = new_query.replace(f'{aero_set}.', '') 

122 data_df = duckdb.query(new_query).to_df() 

123 

124 response = Response( 

125 RESPONSE_TYPE.TABLE, 

126 data_df 

127 ) 

128 except Exception as e: 

129 response = Response( 

130 RESPONSE_TYPE.ERROR, 

131 error_message=str(e) 

132 ) 

133 

134 if need_to_close is True: 

135 self.disconnect() 

136 

137 return response 

138 

139 def query(self, query: ASTNode) -> Response: 

140 """ 

141 Retrieve the data from the SQL statement. 

142 """ 

143 return self.native_query(query.to_string()) 

144 

145 def parse_aql_query(self, aql_query): 

146 # Split the AQL query into tokens 

147 tokens = [t.replace(',', '').upper() for t in re.split(r'\s+', aql_query)] 

148 # Extract the relevant components 

149 select_index = tokens.index("SELECT") 

150 from_index = tokens.index("FROM") 

151 # where_index = tokens.index("WHERE") 

152 

153 selected_bins = tokens[select_index + 1:from_index] 

154 namespace_set = tokens[from_index + 1] 

155 aero_ns, aero_set = namespace_set.split('.') if '.' in namespace_set else None, namespace_set 

156 if not aero_ns: 

157 aero_ns = self.connection_data.get('namespace') 

158 # filter_condition = " ".join(tokens[where_index + 1:]) 

159 return selected_bins, aero_ns, aero_set 

160 

161 def get_tables(self) -> Response: 

162 """ 

163 Get a list with all of the tables in Aerospike 

164 """ 

165 need_to_close = self.is_connected is False 

166 connection = self.connect() 

167 

168 data_lst = [] 

169 request = "sets" 

170 

171 try: 

172 for node, (err, res) in list(connection.info_all(request).items()): 

173 if res: 

174 entries = [entry.strip() for entry in res.strip().split(';') if entry.strip()] 

175 for entry in entries: 

176 data = [d for d in entry.split('=') if ':set' in d or ':objects' in d] 

177 ele = [None, None, None] 

178 for d in data: 

179 if ':set' in d: 

180 ele[0] = d.split(':')[0] 

181 if ':objects' in d: 

182 ele[1] = d.split(':')[0] 

183 if d[0] or d[1]: 

184 ele[2] = request 

185 data_lst.append(ele) 

186 

187 response = Response( 

188 RESPONSE_TYPE.TABLE, 

189 pd.DataFrame(data_lst, columns=['table_schema', 'table_name', 'table_type']) 

190 ) 

191 except Exception as e: 

192 response = Response( 

193 RESPONSE_TYPE.ERROR, 

194 error_message=str(e) 

195 ) 

196 

197 if need_to_close is True: 

198 self.disconnect() 

199 

200 return response 

201 

202 def get_columns(self, table_name: str) -> Response: 

203 """ 

204 Show details about the table 

205 """ 

206 need_to_close = self.is_connected is False 

207 connection = self.connect() 

208 

209 column_df = pd.DataFrame([], columns=['column_name', 'data_type']) 

210 

211 try: 

212 response_table = self.get_tables() 

213 df = response_table.data_frame 

214 if not len(df): 

215 return column_df 

216 df = df[df['table_name'] == table_name] 

217 tbl_dtl_arr = df.iloc[0][['table_schema', 'table_name']] 

218 scan = connection.scan(tbl_dtl_arr[0], tbl_dtl_arr[1]) 

219 res = scan.results() 

220 data_df = pd.DataFrame.from_records([r[2] for r in res]) 

221 column_df = pd.DataFrame(data_df.dtypes).reset_index() 

222 column_df.columns = ['column_name', 'data_type'] 

223 response = Response( 

224 RESPONSE_TYPE.TABLE, 

225 column_df 

226 ) 

227 except Exception as e: 

228 response = Response( 

229 RESPONSE_TYPE.ERROR, 

230 error_message=str(e) 

231 ) 

232 

233 if need_to_close is True: 

234 self.disconnect() 

235 

236 return response