Coverage for mindsdb / integrations / handlers / solace_handler / solace_handler.py: 0%

97 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2 

3from solace.messaging.messaging_service import MessagingService, RetryStrategy 

4from solace.messaging.resources.topic_subscription import TopicSubscription 

5from solace.messaging.receiver.message_receiver import MessageHandler 

6from solace.messaging.resources.topic import Topic 

7from solace.messaging.receiver.inbound_message import InboundMessage 

8 

9from mindsdb_sql_parser import parse_sql 

10from mindsdb_sql_parser import ast 

11 

12from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE 

13from mindsdb.integrations.libs.base import DatabaseHandler 

14from mindsdb.integrations.libs.response import ( 

15 HandlerStatusResponse as StatusResponse, 

16 HandlerResponse as Response, 

17) 

18from mindsdb.utilities import log 

19 

20logger = log.getLogger(__name__) 

21 

22 

23class SolaceHandler(DatabaseHandler): 

24 

25 def __init__(self, name: str = None, **kwargs): 

26 """Registers all API tables and prepares the handler for an API connection. 

27 

28 Args: 

29 name: (str): The handler name to use 

30 """ 

31 super().__init__(name) 

32 args = kwargs.get('connection_data', {}) 

33 

34 if 'host' not in args: 

35 raise ValueError('Host parameter is required') 

36 

37 self.connection_args = args 

38 self.messaging_service = None 

39 self.is_connected = False 

40 

41 def connect(self): 

42 

43 broker_props = { 

44 "solace.messaging.transport.host": self.connection_args['host'], 

45 "solace.messaging.service.vpn-name": self.connection_args.get('vpn-name', 'default'), 

46 "solace.messaging.authentication.scheme.basic.username": self.connection_args.get('username'), 

47 "solace.messaging.authentication.scheme.basic.password": self.connection_args.get('password') 

48 } 

49 

50 self.messaging_service = MessagingService\ 

51 .builder()\ 

52 .from_properties(broker_props) \ 

53 .with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20, 3)) \ 

54 .build() 

55 

56 self.messaging_service.connect() 

57 

58 self.direct_publisher = self.messaging_service\ 

59 .create_direct_message_publisher_builder()\ 

60 .build() 

61 self.direct_publisher.start() 

62 

63 # TODO support persistent_publisher ? 

64 

65 self.is_connected = True 

66 return self.messaging_service 

67 

68 def disconnect(self): 

69 

70 if self.is_connected is False: 

71 return 

72 

73 self.direct_publisher.terminate() 

74 self.messaging_service.disconnect() 

75 self.is_connected = False 

76 return self.is_connected 

77 

78 def check_connection(self) -> StatusResponse: 

79 

80 response = StatusResponse(False) 

81 

82 try: 

83 self.connect() 

84 response.success = True 

85 except Exception as e: 

86 logger.error(f'Error connecting to Solace: {e}!') 

87 response.error_message = e 

88 

89 if response.success is False: 

90 self.is_connected = False 

91 return response 

92 

93 def native_query(self, query: str = None) -> Response: 

94 ast = parse_sql(query) 

95 return self.query(ast) 

96 

97 def query(self, query: ast.ASTNode): 

98 if isinstance(query, ast.Insert): 

99 result = self.handle_insert(query) 

100 else: 

101 raise NotImplementedError 

102 return result 

103 

104 def get_columns(self, table_name: str) -> Response: 

105 df = pd.DataFrame([], columns=['Field']) 

106 df['Type'] = 'str' 

107 

108 return Response(RESPONSE_TYPE.TABLE, df) 

109 

110 def get_tables(self) -> Response: 

111 df = pd.DataFrame([], columns=['table_name', 'table_type']) 

112 

113 return Response(RESPONSE_TYPE.TABLE, df) 

114 

115 def handle_insert(self, query: ast.Insert): 

116 

117 message_builder = self.messaging_service.message_builder() 

118 

119 topic_name = '/'.join(query.table.parts) 

120 

121 column_names = [col.name for col in query.columns] 

122 for insert_row in query.values: 

123 data = dict(zip(column_names, insert_row)) 

124 

125 outbound_message = message_builder.build(data) 

126 self.direct_publisher.publish(destination=Topic.of(topic_name), message=outbound_message) 

127 

128 return Response(RESPONSE_TYPE.OK) 

129 

130 def subscribe(self, stop_event, callback, table_name, columns=None, **kwargs): 

131 

132 class MessageHandlerImpl(MessageHandler): 

133 def on_message(self, message: 'InboundMessage'): 

134 # Check if the payload is a dict 

135 payload = message.get_payload_as_dictionary() 

136 if payload is not None: 

137 data = payload 

138 else: 

139 # check as string 

140 payload = message.get_payload_as_string() 

141 if payload is None: 

142 payload = message.get_payload_as_bytes() 

143 if isinstance(payload, bytearray): 

144 payload = payload.decode() 

145 data = {'data': payload} 

146 

147 if columns is not None: 

148 updated_columns = data.keys() 

149 if not set(columns) & set(updated_columns): 

150 # skip 

151 return 

152 

153 callback(data) 

154 

155 table = ast.Identifier(table_name) 

156 topic_name = '/'.join(table.parts) 

157 

158 topics = [TopicSubscription.of(topic_name)] 

159 direct_receiver = self.messaging_service\ 

160 .create_direct_message_receiver_builder()\ 

161 .with_subscriptions(topics)\ 

162 .build() 

163 

164 direct_receiver.start() 

165 direct_receiver.receive_async(MessageHandlerImpl()) 

166 

167 stop_event.wait() 

168 

169 direct_receiver.terminate()