Coverage for mindsdb / integrations / handlers / solr_handler / solr_handler.py: 0%

88 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2import pandas as pd 

3 

4from sqlalchemy import create_engine 

5 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb_sql_parser.ast.base import ASTNode 

8 

9from mindsdb.utilities import log 

10from mindsdb.integrations.libs.base import DatabaseHandler 

11from mindsdb.integrations.libs.response import ( 

12 HandlerStatusResponse as StatusResponse, 

13 HandlerResponse as Response, 

14 RESPONSE_TYPE 

15) 

16 

17 

18logger = log.getLogger(__name__) 

19 

20 

21class SolrHandler(DatabaseHandler): 

22 """ 

23 This handler handles connection and execution of the Solr SQL statements. 

24 """ 

25 

26 name = 'solr' 

27 

28 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

29 super().__init__(name) 

30 self.parser = parse_sql 

31 self.dialect = 'solr' 

32 

33 if ('host' not in connection_data) or ('port' not in connection_data) or ('collection' not in connection_data): 

34 raise Exception("The host, port and collection parameter should be provided!") 

35 

36 optional_parameters = ['use_ssl', 'username', 'password'] 

37 for parameter in optional_parameters: 

38 if parameter not in connection_data: 

39 connection_data[parameter] = None 

40 

41 if connection_data.get('use_ssl', False): 

42 connection_data['use_ssl'] = True 

43 else: 

44 connection_data['use_ssl'] = False 

45 

46 self.connection_data = connection_data 

47 self.kwargs = kwargs 

48 

49 self.connection = None 

50 self.is_connected = False 

51 

52 def __del__(self): 

53 if self.is_connected is True: 

54 self.disconnect() 

55 

56 def connect(self): 

57 """ 

58 Set up the connection required by the handler. 

59 Returns: 

60 HandlerStatusResponse 

61 """ 

62 if self.is_connected is True: 

63 return self.connection 

64 

65 config = { 

66 'username': self.connection_data.get('username'), 

67 'password': self.connection_data.get('password'), 

68 'host': self.connection_data.get('host'), 

69 'port': self.connection_data.get('port'), 

70 'server_path': self.connection_data.get('server_path', 'solr'), 

71 'collection': self.connection_data.get('collection'), 

72 'use_ssl': self.connection_data.get('use_ssl') 

73 } 

74 

75 connection = create_engine("solr://{username}:{password}@{host}:{port}/{server_path}/{collection}/sql?use_ssl={use_ssl}".format(**config)) 

76 self.is_connected = True 

77 self.connection = connection.connect() 

78 return self.connection 

79 

80 def disconnect(self): 

81 """ 

82 Close any existing connections. 

83 """ 

84 if self.is_connected is False: 

85 return 

86 self.connection.close() 

87 self.is_connected = False 

88 return 

89 

90 def check_connection(self) -> StatusResponse: 

91 """ 

92 Check the connection of the Solr database 

93 Returns: 

94 HandlerStatusResponse 

95 """ 

96 

97 response = StatusResponse(False) 

98 need_to_close = self.is_connected is False 

99 

100 try: 

101 self.connect() 

102 response.success = True 

103 except Exception as e: 

104 logger.error(f'Error connecting to Solr {self.connection_data["host"]}, {e}!') 

105 response.error_message = str(e) 

106 

107 if response.success is True and need_to_close: 

108 self.disconnect() 

109 if response.success is False and self.is_connected is True: 

110 self.is_connected = False 

111 

112 return response 

113 

114 def native_query(self, query: str) -> Response: 

115 """ 

116 Receive raw query and act upon it somehow. 

117 Args: 

118 query (str): query in native format 

119 Returns: 

120 HandlerResponse 

121 """ 

122 

123 need_to_close = self.is_connected is False 

124 

125 connection = self.connect() 

126 

127 try: 

128 result = connection.execute(query) 

129 columns = list(result.keys()) 

130 if result: 

131 response = Response( 

132 RESPONSE_TYPE.TABLE, 

133 pd.DataFrame( 

134 result, 

135 columns=columns 

136 ) 

137 ) 

138 else: 

139 response = Response(RESPONSE_TYPE.OK) 

140 

141 except Exception as e: 

142 logger.error(f'Error running query: {query} on {self.connection_data["host"]}!') 

143 response = Response( 

144 RESPONSE_TYPE.ERROR, 

145 error_message=str(e) 

146 ) 

147 

148 if need_to_close is True: 

149 self.disconnect() 

150 

151 return response 

152 

153 def query(self, query: ASTNode) -> Response: 

154 """ 

155 Retrieve the data from the SQL statement. 

156 """ 

157 return self.native_query(query.to_string()) 

158 

159 def get_tables(self) -> Response: 

160 """ 

161 Get a list with all of the tables in Solr 

162 """ 

163 result = {} 

164 result['data_frame'] = pd.DataFrame([self.connection_data.get('collection')]) 

165 df = result.data_frame 

166 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'}) 

167 return result 

168 

169 def get_columns(self, table_name) -> Response: 

170 """ 

171 Show details about the table 

172 """ 

173 q = f"select * from {table_name} limit 1" 

174 result = self.native_query(q) 

175 df = pd.DataFrame([[col] for col in result.data_frame.columns]) 

176 result.data_frame = df.rename(columns={df.columns[0]: 'column_name'}) 

177 return result