Coverage for mindsdb / integrations / handlers / solace_handler / solace_handler.py: 0%
97 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pandas as pd
3from solace.messaging.messaging_service import MessagingService, RetryStrategy
4from solace.messaging.resources.topic_subscription import TopicSubscription
5from solace.messaging.receiver.message_receiver import MessageHandler
6from solace.messaging.resources.topic import Topic
7from solace.messaging.receiver.inbound_message import InboundMessage
9from mindsdb_sql_parser import parse_sql
10from mindsdb_sql_parser import ast
12from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE
13from mindsdb.integrations.libs.base import DatabaseHandler
14from mindsdb.integrations.libs.response import (
15 HandlerStatusResponse as StatusResponse,
16 HandlerResponse as Response,
17)
18from mindsdb.utilities import log
20logger = log.getLogger(__name__)
23class SolaceHandler(DatabaseHandler):
25 def __init__(self, name: str = None, **kwargs):
26 """Registers all API tables and prepares the handler for an API connection.
28 Args:
29 name: (str): The handler name to use
30 """
31 super().__init__(name)
32 args = kwargs.get('connection_data', {})
34 if 'host' not in args:
35 raise ValueError('Host parameter is required')
37 self.connection_args = args
38 self.messaging_service = None
39 self.is_connected = False
41 def connect(self):
43 broker_props = {
44 "solace.messaging.transport.host": self.connection_args['host'],
45 "solace.messaging.service.vpn-name": self.connection_args.get('vpn-name', 'default'),
46 "solace.messaging.authentication.scheme.basic.username": self.connection_args.get('username'),
47 "solace.messaging.authentication.scheme.basic.password": self.connection_args.get('password')
48 }
50 self.messaging_service = MessagingService\
51 .builder()\
52 .from_properties(broker_props) \
53 .with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20, 3)) \
54 .build()
56 self.messaging_service.connect()
58 self.direct_publisher = self.messaging_service\
59 .create_direct_message_publisher_builder()\
60 .build()
61 self.direct_publisher.start()
63 # TODO support persistent_publisher ?
65 self.is_connected = True
66 return self.messaging_service
68 def disconnect(self):
70 if self.is_connected is False:
71 return
73 self.direct_publisher.terminate()
74 self.messaging_service.disconnect()
75 self.is_connected = False
76 return self.is_connected
78 def check_connection(self) -> StatusResponse:
80 response = StatusResponse(False)
82 try:
83 self.connect()
84 response.success = True
85 except Exception as e:
86 logger.error(f'Error connecting to Solace: {e}!')
87 response.error_message = e
89 if response.success is False:
90 self.is_connected = False
91 return response
93 def native_query(self, query: str = None) -> Response:
94 ast = parse_sql(query)
95 return self.query(ast)
97 def query(self, query: ast.ASTNode):
98 if isinstance(query, ast.Insert):
99 result = self.handle_insert(query)
100 else:
101 raise NotImplementedError
102 return result
104 def get_columns(self, table_name: str) -> Response:
105 df = pd.DataFrame([], columns=['Field'])
106 df['Type'] = 'str'
108 return Response(RESPONSE_TYPE.TABLE, df)
110 def get_tables(self) -> Response:
111 df = pd.DataFrame([], columns=['table_name', 'table_type'])
113 return Response(RESPONSE_TYPE.TABLE, df)
115 def handle_insert(self, query: ast.Insert):
117 message_builder = self.messaging_service.message_builder()
119 topic_name = '/'.join(query.table.parts)
121 column_names = [col.name for col in query.columns]
122 for insert_row in query.values:
123 data = dict(zip(column_names, insert_row))
125 outbound_message = message_builder.build(data)
126 self.direct_publisher.publish(destination=Topic.of(topic_name), message=outbound_message)
128 return Response(RESPONSE_TYPE.OK)
130 def subscribe(self, stop_event, callback, table_name, columns=None, **kwargs):
132 class MessageHandlerImpl(MessageHandler):
133 def on_message(self, message: 'InboundMessage'):
134 # Check if the payload is a dict
135 payload = message.get_payload_as_dictionary()
136 if payload is not None:
137 data = payload
138 else:
139 # check as string
140 payload = message.get_payload_as_string()
141 if payload is None:
142 payload = message.get_payload_as_bytes()
143 if isinstance(payload, bytearray):
144 payload = payload.decode()
145 data = {'data': payload}
147 if columns is not None:
148 updated_columns = data.keys()
149 if not set(columns) & set(updated_columns):
150 # skip
151 return
153 callback(data)
155 table = ast.Identifier(table_name)
156 topic_name = '/'.join(table.parts)
158 topics = [TopicSubscription.of(topic_name)]
159 direct_receiver = self.messaging_service\
160 .create_direct_message_receiver_builder()\
161 .with_subscriptions(topics)\
162 .build()
164 direct_receiver.start()
165 direct_receiver.receive_async(MessageHandlerImpl())
167 stop_event.wait()
169 direct_receiver.terminate()