Coverage for mindsdb / integrations / handlers / aerospike_handler / aerospike_handler.py: 0%
135 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import re
2from typing import Optional
4import duckdb
5import aerospike
6import pandas as pd
7# from sqlalchemy import create_engine
9from mindsdb_sql_parser import parse_sql
10from mindsdb_sql_parser.ast.base import ASTNode
12# from mindsdb.utilities import log
13from mindsdb.integrations.libs.base import DatabaseHandler
14from mindsdb.integrations.libs.response import (
15 HandlerStatusResponse as StatusResponse,
16 HandlerResponse as Response,
17 RESPONSE_TYPE
18)
21class AerospikeHandler(DatabaseHandler):
22 """
23 This handler handles connection and execution of the Solr SQL statements.
24 """
25 name = 'aerospike'
27 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
28 super().__init__(name)
29 self.parser = parse_sql
30 self.dialect = 'aerospike'
31 self.connection_data = connection_data
32 self.kwargs = kwargs
33 if not self.connection_data.get('host'):
34 raise Exception("The host parameter should be provided!")
35 if not self.connection_data.get('port'):
36 raise Exception("The port parameter should be provided!")
38 self.connection = None
39 self.is_connected = False
41 def __del__(self):
42 if self.is_connected is True:
43 self.disconnect()
45 def connect(self):
46 """
47 Set up the connection required by the handler.
48 Returns:
49 HandlerStatusResponse
50 """
51 if self.is_connected is True:
52 return self.connection
54 user = self.connection_data.get('user', None)
55 password = self.connection_data.get('password', None)
56 config = {
57 'user': user,
58 'password': password,
59 'hosts': [(self.connection_data.get('host'), self.connection_data.get('port'))],
60 }
61 connection = aerospike.client(config).connect()
62 self.is_connected = True
63 self.connection = connection.connect()
64 return self.connection
66 def disconnect(self):
67 """
68 Close any existing connections.
69 """
70 if self.is_connected is False:
71 return
72 self.connection.close()
73 self.is_connected = False
74 return
76 def check_connection(self) -> StatusResponse:
77 """
78 Check the connection of the Aerospike database
79 Returns:
80 HandlerStatusResponse
81 """
83 response = StatusResponse(False)
84 need_to_close = self.is_connected is False
86 try:
87 self.connect()
88 response.success = True
89 except Exception as e:
90 response.error_message = str(e)
92 if response.success is True and need_to_close:
93 self.disconnect()
94 if response.success is False and self.is_connected is True:
95 self.is_connected = False
96 return response
98 def native_query(self, query: str) -> Response:
99 """
100 Receive raw query and act upon it somehow.
101 Args:
102 query (str): query in native format
103 Returns:
104 HandlerResponse
105 """
107 need_to_close = self.is_connected is False
109 connection = self.connect()
111 try:
112 # where is not supported
113 selected_bins, aero_ns, aero_set = self.parse_aql_query(query)
114 aero_ns = aero_ns.lower()
115 aero_set = aero_set.lower()
116 scan = connection.scan(aero_ns.lower(), aero_set.lower())
117 res = scan.results()
118 data_df = pd.DataFrame.from_records([r[2] for r in res])
119 if ' where ' in query or ' WHERE ' in query or '*' not in selected_bins:
120 new_query = re.sub(r'FROM [\w\.]+', 'FROM ' + 'data_df', query, 1)
121 new_query = new_query.replace(f'{aero_set}.', '')
122 data_df = duckdb.query(new_query).to_df()
124 response = Response(
125 RESPONSE_TYPE.TABLE,
126 data_df
127 )
128 except Exception as e:
129 response = Response(
130 RESPONSE_TYPE.ERROR,
131 error_message=str(e)
132 )
134 if need_to_close is True:
135 self.disconnect()
137 return response
139 def query(self, query: ASTNode) -> Response:
140 """
141 Retrieve the data from the SQL statement.
142 """
143 return self.native_query(query.to_string())
145 def parse_aql_query(self, aql_query):
146 # Split the AQL query into tokens
147 tokens = [t.replace(',', '').upper() for t in re.split(r'\s+', aql_query)]
148 # Extract the relevant components
149 select_index = tokens.index("SELECT")
150 from_index = tokens.index("FROM")
151 # where_index = tokens.index("WHERE")
153 selected_bins = tokens[select_index + 1:from_index]
154 namespace_set = tokens[from_index + 1]
155 aero_ns, aero_set = namespace_set.split('.') if '.' in namespace_set else None, namespace_set
156 if not aero_ns:
157 aero_ns = self.connection_data.get('namespace')
158 # filter_condition = " ".join(tokens[where_index + 1:])
159 return selected_bins, aero_ns, aero_set
161 def get_tables(self) -> Response:
162 """
163 Get a list with all of the tables in Aerospike
164 """
165 need_to_close = self.is_connected is False
166 connection = self.connect()
168 data_lst = []
169 request = "sets"
171 try:
172 for node, (err, res) in list(connection.info_all(request).items()):
173 if res:
174 entries = [entry.strip() for entry in res.strip().split(';') if entry.strip()]
175 for entry in entries:
176 data = [d for d in entry.split('=') if ':set' in d or ':objects' in d]
177 ele = [None, None, None]
178 for d in data:
179 if ':set' in d:
180 ele[0] = d.split(':')[0]
181 if ':objects' in d:
182 ele[1] = d.split(':')[0]
183 if d[0] or d[1]:
184 ele[2] = request
185 data_lst.append(ele)
187 response = Response(
188 RESPONSE_TYPE.TABLE,
189 pd.DataFrame(data_lst, columns=['table_schema', 'table_name', 'table_type'])
190 )
191 except Exception as e:
192 response = Response(
193 RESPONSE_TYPE.ERROR,
194 error_message=str(e)
195 )
197 if need_to_close is True:
198 self.disconnect()
200 return response
202 def get_columns(self, table_name: str) -> Response:
203 """
204 Show details about the table
205 """
206 need_to_close = self.is_connected is False
207 connection = self.connect()
209 column_df = pd.DataFrame([], columns=['column_name', 'data_type'])
211 try:
212 response_table = self.get_tables()
213 df = response_table.data_frame
214 if not len(df):
215 return column_df
216 df = df[df['table_name'] == table_name]
217 tbl_dtl_arr = df.iloc[0][['table_schema', 'table_name']]
218 scan = connection.scan(tbl_dtl_arr[0], tbl_dtl_arr[1])
219 res = scan.results()
220 data_df = pd.DataFrame.from_records([r[2] for r in res])
221 column_df = pd.DataFrame(data_df.dtypes).reset_index()
222 column_df.columns = ['column_name', 'data_type']
223 response = Response(
224 RESPONSE_TYPE.TABLE,
225 column_df
226 )
227 except Exception as e:
228 response = Response(
229 RESPONSE_TYPE.ERROR,
230 error_message=str(e)
231 )
233 if need_to_close is True:
234 self.disconnect()
236 return response