Coverage for mindsdb / integrations / handlers / solr_handler / solr_handler.py: 0%
88 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
2import pandas as pd
4from sqlalchemy import create_engine
6from mindsdb_sql_parser import parse_sql
7from mindsdb_sql_parser.ast.base import ASTNode
9from mindsdb.utilities import log
10from mindsdb.integrations.libs.base import DatabaseHandler
11from mindsdb.integrations.libs.response import (
12 HandlerStatusResponse as StatusResponse,
13 HandlerResponse as Response,
14 RESPONSE_TYPE
15)
18logger = log.getLogger(__name__)
21class SolrHandler(DatabaseHandler):
22 """
23 This handler handles connection and execution of the Solr SQL statements.
24 """
26 name = 'solr'
28 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
29 super().__init__(name)
30 self.parser = parse_sql
31 self.dialect = 'solr'
33 if ('host' not in connection_data) or ('port' not in connection_data) or ('collection' not in connection_data):
34 raise Exception("The host, port and collection parameter should be provided!")
36 optional_parameters = ['use_ssl', 'username', 'password']
37 for parameter in optional_parameters:
38 if parameter not in connection_data:
39 connection_data[parameter] = None
41 if connection_data.get('use_ssl', False):
42 connection_data['use_ssl'] = True
43 else:
44 connection_data['use_ssl'] = False
46 self.connection_data = connection_data
47 self.kwargs = kwargs
49 self.connection = None
50 self.is_connected = False
52 def __del__(self):
53 if self.is_connected is True:
54 self.disconnect()
56 def connect(self):
57 """
58 Set up the connection required by the handler.
59 Returns:
60 HandlerStatusResponse
61 """
62 if self.is_connected is True:
63 return self.connection
65 config = {
66 'username': self.connection_data.get('username'),
67 'password': self.connection_data.get('password'),
68 'host': self.connection_data.get('host'),
69 'port': self.connection_data.get('port'),
70 'server_path': self.connection_data.get('server_path', 'solr'),
71 'collection': self.connection_data.get('collection'),
72 'use_ssl': self.connection_data.get('use_ssl')
73 }
75 connection = create_engine("solr://{username}:{password}@{host}:{port}/{server_path}/{collection}/sql?use_ssl={use_ssl}".format(**config))
76 self.is_connected = True
77 self.connection = connection.connect()
78 return self.connection
80 def disconnect(self):
81 """
82 Close any existing connections.
83 """
84 if self.is_connected is False:
85 return
86 self.connection.close()
87 self.is_connected = False
88 return
90 def check_connection(self) -> StatusResponse:
91 """
92 Check the connection of the Solr database
93 Returns:
94 HandlerStatusResponse
95 """
97 response = StatusResponse(False)
98 need_to_close = self.is_connected is False
100 try:
101 self.connect()
102 response.success = True
103 except Exception as e:
104 logger.error(f'Error connecting to Solr {self.connection_data["host"]}, {e}!')
105 response.error_message = str(e)
107 if response.success is True and need_to_close:
108 self.disconnect()
109 if response.success is False and self.is_connected is True:
110 self.is_connected = False
112 return response
114 def native_query(self, query: str) -> Response:
115 """
116 Receive raw query and act upon it somehow.
117 Args:
118 query (str): query in native format
119 Returns:
120 HandlerResponse
121 """
123 need_to_close = self.is_connected is False
125 connection = self.connect()
127 try:
128 result = connection.execute(query)
129 columns = list(result.keys())
130 if result:
131 response = Response(
132 RESPONSE_TYPE.TABLE,
133 pd.DataFrame(
134 result,
135 columns=columns
136 )
137 )
138 else:
139 response = Response(RESPONSE_TYPE.OK)
141 except Exception as e:
142 logger.error(f'Error running query: {query} on {self.connection_data["host"]}!')
143 response = Response(
144 RESPONSE_TYPE.ERROR,
145 error_message=str(e)
146 )
148 if need_to_close is True:
149 self.disconnect()
151 return response
153 def query(self, query: ASTNode) -> Response:
154 """
155 Retrieve the data from the SQL statement.
156 """
157 return self.native_query(query.to_string())
159 def get_tables(self) -> Response:
160 """
161 Get a list with all of the tables in Solr
162 """
163 result = {}
164 result['data_frame'] = pd.DataFrame([self.connection_data.get('collection')])
165 df = result.data_frame
166 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'})
167 return result
169 def get_columns(self, table_name) -> Response:
170 """
171 Show details about the table
172 """
173 q = f"select * from {table_name} limit 1"
174 result = self.native_query(q)
175 df = pd.DataFrame([[col] for col in result.data_frame.columns])
176 result.data_frame = df.rename(columns={df.columns[0]: 'column_name'})
177 return result