Coverage for mindsdb / integrations / handlers / ignite_handler / ignite_handler.py: 0%

95 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2 

3from pyignite import Client 

4import pandas as pd 

5 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb.integrations.libs.base import DatabaseHandler 

8 

9from mindsdb_sql_parser.ast.base import ASTNode 

10 

11from mindsdb.utilities import log 

12from mindsdb.integrations.libs.response import ( 

13 HandlerStatusResponse as StatusResponse, 

14 HandlerResponse as Response, 

15 RESPONSE_TYPE 

16) 

17 

18 

19logger = log.getLogger(__name__) 

20 

21 

22class IgniteHandler(DatabaseHandler): 

23 """ 

24 This handler handles connection and execution of the Apache Ignite statements. 

25 """ 

26 

27 name = 'ignite' 

28 

29 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

30 """ 

31 Initialize the handler. 

32 Args: 

33 name (str): name of particular handler instance 

34 connection_data (dict): parameters for connecting to the database 

35 **kwargs: arbitrary keyword arguments. 

36 """ 

37 super().__init__(name) 

38 self.parser = parse_sql 

39 self.dialect = 'ignite' 

40 

41 optional_parameters = ['username', 'password', 'schema'] 

42 for parameter in optional_parameters: 

43 if parameter not in connection_data: 

44 connection_data[parameter] = None 

45 

46 self.connection_data = connection_data 

47 self.kwargs = kwargs 

48 

49 self.client = None 

50 self.connection = None 

51 self.is_connected = False 

52 

53 def __del__(self): 

54 if self.is_connected is True: 

55 self.disconnect() 

56 

57 def connect(self) -> StatusResponse: 

58 """ 

59 Set up the connection required by the handler. 

60 Returns: 

61 HandlerStatusResponse 

62 """ 

63 

64 if self.is_connected is True: 

65 return self.connection 

66 

67 self.client = Client( 

68 username=self.connection_data['username'], 

69 password=self.connection_data['password'] 

70 ) 

71 

72 try: 

73 port = int(self.connection_data['port']) 

74 except ValueError: 

75 raise ValueError("Invalid port number") 

76 

77 nodes = [(self.connection_data['host'], port)] 

78 self.connection = self.client.connect(nodes) 

79 self.is_connected = True 

80 

81 return self.client, self.connection 

82 

83 def disconnect(self): 

84 """ 

85 Close any existing connections. 

86 """ 

87 

88 if self.is_connected is False: 

89 return 

90 

91 self.client.close() 

92 self.is_connected = False 

93 return self.is_connected 

94 

95 def check_connection(self) -> StatusResponse: 

96 """ 

97 Check connection to the handler. 

98 Returns: 

99 HandlerStatusResponse 

100 """ 

101 

102 response = StatusResponse(False) 

103 need_to_close = self.is_connected is False 

104 

105 try: 

106 self.connect() 

107 response.success = True 

108 except Exception as e: 

109 logger.error('Error connecting to Apache Ignite!') 

110 response.error_message = str(e) 

111 finally: 

112 if response.success is True and need_to_close: 

113 self.disconnect() 

114 if response.success is False and self.is_connected is True: 

115 self.is_connected = False 

116 

117 return response 

118 

119 def native_query(self, query: str) -> StatusResponse: 

120 """ 

121 Receive raw query and act upon it somehow. 

122 Args: 

123 query (str): query in native format 

124 Returns: 

125 HandlerResponse 

126 """ 

127 

128 need_to_close = self.is_connected is False 

129 

130 client, connection = self.connect() 

131 

132 try: 

133 with connection: 

134 with client.sql(query, include_field_names=True, schema=self.connection_data['schema']) as cursor: 

135 result = list(cursor) 

136 if result and result[0][0] != 'UPDATED': 

137 response = Response( 

138 RESPONSE_TYPE.TABLE, 

139 data_frame=pd.DataFrame( 

140 result[1:], 

141 columns=result[0] 

142 ) 

143 ) 

144 else: 

145 response = Response(RESPONSE_TYPE.OK) 

146 except Exception as e: 

147 logger.error(f'Error running query: {query} on Apache Ignite!') 

148 response = Response( 

149 RESPONSE_TYPE.ERROR, 

150 error_message=str(e) 

151 ) 

152 

153 cursor.close() 

154 if need_to_close is True: 

155 self.disconnect() 

156 

157 return response 

158 

159 def query(self, query: ASTNode) -> StatusResponse: 

160 """ 

161 Receive query as AST (abstract syntax tree) and act upon it somehow. 

162 Args: 

163 query (ASTNode): sql query represented as AST. May be any kind 

164 of query: SELECT, INTSERT, DELETE, etc 

165 Returns: 

166 HandlerResponse 

167 """ 

168 

169 if isinstance(query, ASTNode): 

170 query_str = query.to_string() 

171 else: 

172 query_str = str(query) 

173 

174 return self.native_query(query_str) 

175 

176 def get_tables(self) -> StatusResponse: 

177 """ 

178 Return list of entities that will be accessible as tables. 

179 Returns: 

180 HandlerResponse 

181 """ 

182 

183 query = """ 

184 SELECT TABLE_NAME FROM SYS.TABLES 

185 """ 

186 result = self.native_query(query) 

187 df = result.data_frame 

188 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'}) 

189 return result 

190 

191 def get_columns(self, table_name: str) -> StatusResponse: 

192 """ 

193 Returns a list of entity columns. 

194 Args: 

195 table_name (str): name of one of tables returned by self.get_tables() 

196 Returns: 

197 HandlerResponse 

198 """ 

199 

200 query = f""" 

201 SELECT COLUMN_NAME, TYPE FROM SYS.TABLE_COLUMNS WHERE TABLE_NAME = '{table_name.upper()}' 

202 """ 

203 result = self.native_query(query) 

204 df = result.data_frame 

205 df['TYPE'] = df.apply(lambda row: row['TYPE'].split('.')[-1], axis=1) 

206 df = df.iloc[2:] 

207 result.data_frame = df.rename(columns={'COLUMN_NAME': 'column_name', 'TYPE': 'data_type'}) 

208 return result