Coverage for mindsdb / integrations / handlers / ignite_handler / ignite_handler.py: 0%
95 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
3from pyignite import Client
4import pandas as pd
6from mindsdb_sql_parser import parse_sql
7from mindsdb.integrations.libs.base import DatabaseHandler
9from mindsdb_sql_parser.ast.base import ASTNode
11from mindsdb.utilities import log
12from mindsdb.integrations.libs.response import (
13 HandlerStatusResponse as StatusResponse,
14 HandlerResponse as Response,
15 RESPONSE_TYPE
16)
19logger = log.getLogger(__name__)
22class IgniteHandler(DatabaseHandler):
23 """
24 This handler handles connection and execution of the Apache Ignite statements.
25 """
27 name = 'ignite'
29 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
30 """
31 Initialize the handler.
32 Args:
33 name (str): name of particular handler instance
34 connection_data (dict): parameters for connecting to the database
35 **kwargs: arbitrary keyword arguments.
36 """
37 super().__init__(name)
38 self.parser = parse_sql
39 self.dialect = 'ignite'
41 optional_parameters = ['username', 'password', 'schema']
42 for parameter in optional_parameters:
43 if parameter not in connection_data:
44 connection_data[parameter] = None
46 self.connection_data = connection_data
47 self.kwargs = kwargs
49 self.client = None
50 self.connection = None
51 self.is_connected = False
53 def __del__(self):
54 if self.is_connected is True:
55 self.disconnect()
57 def connect(self) -> StatusResponse:
58 """
59 Set up the connection required by the handler.
60 Returns:
61 HandlerStatusResponse
62 """
64 if self.is_connected is True:
65 return self.connection
67 self.client = Client(
68 username=self.connection_data['username'],
69 password=self.connection_data['password']
70 )
72 try:
73 port = int(self.connection_data['port'])
74 except ValueError:
75 raise ValueError("Invalid port number")
77 nodes = [(self.connection_data['host'], port)]
78 self.connection = self.client.connect(nodes)
79 self.is_connected = True
81 return self.client, self.connection
83 def disconnect(self):
84 """
85 Close any existing connections.
86 """
88 if self.is_connected is False:
89 return
91 self.client.close()
92 self.is_connected = False
93 return self.is_connected
95 def check_connection(self) -> StatusResponse:
96 """
97 Check connection to the handler.
98 Returns:
99 HandlerStatusResponse
100 """
102 response = StatusResponse(False)
103 need_to_close = self.is_connected is False
105 try:
106 self.connect()
107 response.success = True
108 except Exception as e:
109 logger.error('Error connecting to Apache Ignite!')
110 response.error_message = str(e)
111 finally:
112 if response.success is True and need_to_close:
113 self.disconnect()
114 if response.success is False and self.is_connected is True:
115 self.is_connected = False
117 return response
119 def native_query(self, query: str) -> StatusResponse:
120 """
121 Receive raw query and act upon it somehow.
122 Args:
123 query (str): query in native format
124 Returns:
125 HandlerResponse
126 """
128 need_to_close = self.is_connected is False
130 client, connection = self.connect()
132 try:
133 with connection:
134 with client.sql(query, include_field_names=True, schema=self.connection_data['schema']) as cursor:
135 result = list(cursor)
136 if result and result[0][0] != 'UPDATED':
137 response = Response(
138 RESPONSE_TYPE.TABLE,
139 data_frame=pd.DataFrame(
140 result[1:],
141 columns=result[0]
142 )
143 )
144 else:
145 response = Response(RESPONSE_TYPE.OK)
146 except Exception as e:
147 logger.error(f'Error running query: {query} on Apache Ignite!')
148 response = Response(
149 RESPONSE_TYPE.ERROR,
150 error_message=str(e)
151 )
153 cursor.close()
154 if need_to_close is True:
155 self.disconnect()
157 return response
159 def query(self, query: ASTNode) -> StatusResponse:
160 """
161 Receive query as AST (abstract syntax tree) and act upon it somehow.
162 Args:
163 query (ASTNode): sql query represented as AST. May be any kind
164 of query: SELECT, INTSERT, DELETE, etc
165 Returns:
166 HandlerResponse
167 """
169 if isinstance(query, ASTNode):
170 query_str = query.to_string()
171 else:
172 query_str = str(query)
174 return self.native_query(query_str)
176 def get_tables(self) -> StatusResponse:
177 """
178 Return list of entities that will be accessible as tables.
179 Returns:
180 HandlerResponse
181 """
183 query = """
184 SELECT TABLE_NAME FROM SYS.TABLES
185 """
186 result = self.native_query(query)
187 df = result.data_frame
188 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'})
189 return result
191 def get_columns(self, table_name: str) -> StatusResponse:
192 """
193 Returns a list of entity columns.
194 Args:
195 table_name (str): name of one of tables returned by self.get_tables()
196 Returns:
197 HandlerResponse
198 """
200 query = f"""
201 SELECT COLUMN_NAME, TYPE FROM SYS.TABLE_COLUMNS WHERE TABLE_NAME = '{table_name.upper()}'
202 """
203 result = self.native_query(query)
204 df = result.data_frame
205 df['TYPE'] = df.apply(lambda row: row['TYPE'].split('.')[-1], axis=1)
206 df = df.iloc[2:]
207 result.data_frame = df.rename(columns={'COLUMN_NAME': 'column_name', 'TYPE': 'data_type'})
208 return result