Coverage for mindsdb / integrations / handlers / slack_handler / slack_handler.py: 88%

136 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from copy import deepcopy 

2import datetime as dt 

3import os 

4import threading 

5from typing import Any, Callable, Dict, List, Text 

6 

7import pandas as pd 

8from slack_sdk import WebClient 

9from slack_sdk.errors import SlackApiError 

10from slack_sdk.socket_mode import SocketModeClient 

11from slack_sdk.socket_mode.response import SocketModeResponse 

12from slack_sdk.socket_mode.request import SocketModeRequest 

13 

14from mindsdb.integrations.handlers.slack_handler.slack_tables import ( 

15 SlackConversationsTable, 

16 SlackMessagesTable, 

17 SlackThreadsTable, 

18 SlackUsersTable 

19) 

20from mindsdb.integrations.libs.api_handler import APIChatHandler, FuncParser 

21from mindsdb.integrations.libs.response import ( 

22 HandlerResponse as Response, 

23 HandlerStatusResponse as StatusResponse, 

24 RESPONSE_TYPE 

25) 

26from mindsdb.utilities.config import Config 

27from mindsdb.utilities import log 

28 

29logger = log.getLogger(__name__) 

30 

31 

32class SlackHandler(APIChatHandler): 

33 """ 

34 This handler handles the connection and execution of SQL statements on Slack. 

35 Additionally, it allows the setup of a real-time connection to the Slack API using the Socket Mode for chat-bots. 

36 """ 

37 

38 def __init__(self, name: Text, connection_data: Dict, **kwargs: Any) -> None: 

39 """ 

40 Initializes the handler. 

41 

42 Args: 

43 name (Text): The name of the handler instance. 

44 connection_data (Dict): The connection data required to connect to the SAP HANA database. 

45 kwargs(Any): Arbitrary keyword arguments. 

46 """ 

47 super().__init__(name) 

48 self.connection_data = connection_data 

49 self.kwargs = kwargs 

50 

51 # If the parameters are not provided, check the environment variables and the handler configuration. 

52 handler_config = Config().get('slack_handler', {}) 

53 

54 for key in ['token', 'app_token']: 

55 if key not in self.connection_data: 

56 if f'SLACK_{key.upper()}' in os.environ: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 self.connection_data[key] = os.environ[f'SLACK_{key.upper()}'] 

58 elif key in handler_config: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 self.connection_data[key] = handler_config[key] 

60 

61 self.web_connection = None 

62 self._socket_connection = None 

63 self.is_connected = False 

64 

65 self._register_table('conversations', SlackConversationsTable(self)) 

66 self._register_table('messages', SlackMessagesTable(self)) 

67 self._register_table('threads', SlackThreadsTable(self)) 

68 self._register_table('users', SlackUsersTable(self)) 

69 

70 def connect(self) -> WebClient: 

71 """ 

72 Establishes a connection to the Slack API using the WebClient. 

73 

74 Returns: 

75 WebClient: The WebClient object to interact with the Slack API. 

76 """ 

77 if self.is_connected is True: 

78 return self.web_connection 

79 

80 # Check if the mandatory connection parameter (token) is available. 

81 if 'token' not in self.connection_data: 

82 raise ValueError('Required parameter (token) must be provided.') 

83 

84 try: 

85 self.web_connection = WebClient(token=self.connection_data['token']) 

86 self.is_connected = True 

87 return self.web_connection 

88 except Exception as unknown_error: 

89 logger.error(f'Unknown error connecting to Slack API: {unknown_error}') 

90 raise 

91 

92 def check_connection(self) -> StatusResponse: 

93 """ 

94 Checks the status of the connection to the Slack API, both for the WebClient and the Socket Mode. 

95 

96 Raises: 

97 SlackApiError: If an error occurs while connecting to the Slack API. 

98 

99 Returns: 

100 StatusResponse: An object containing the success status and an error message if an error occurs. 

101 """ 

102 response = StatusResponse(False) 

103 

104 try: 

105 web_connection = self.connect() 

106 # Check the status of the web connection. 

107 web_connection.auth_test() 

108 

109 # Check the status of the socket connection if the app_token is provided. 

110 if 'app_token' in self.connection_data: 110 ↛ 118line 110 didn't jump to line 118 because the condition on line 110 was always true

111 _socket_connection = SocketModeClient( 

112 app_token=self.connection_data['app_token'], 

113 web_client=web_connection 

114 ) 

115 _socket_connection.connect() 

116 _socket_connection.disconnect() 

117 

118 response.success = True 

119 except (SlackApiError, ValueError) as known_error: 

120 logger.error(f'Connection check to the Slack API failed, {known_error}!') 

121 response.error_message = str(known_error) 

122 except Exception as unknown_error: 

123 logger.error(f'Connection check to the Slack API failed due to an unknown error, {unknown_error}!') 

124 response.error_message = str(unknown_error) 

125 

126 if response.success is False and self.is_connected is True: 

127 self.is_connected = False 

128 

129 return response 

130 

131 def native_query(self, query: Text = None) -> Response: 

132 """ 

133 Executes native Slack SDK methods as specified in the query string. 

134 

135 Args: 

136 query: The query string containing the method name and parameters. 

137 

138 Returns: 

139 Response: A response object containing the result of the query. 

140 """ 

141 method_name, params = FuncParser().from_string(query) 

142 

143 df = self._call_slack_api(method_name, params) 

144 

145 return Response( 

146 RESPONSE_TYPE.TABLE, 

147 data_frame=df 

148 ) 

149 

150 def _call_slack_api(self, method_name: Text = None, params: Dict = None) -> List[Dict]: 

151 """ 

152 Calls the Slack SDK method with the specified method name and parameters. 

153 

154 Args: 

155 method_name (Text): The name of the method to call. 

156 params (Dict): The parameters to pass to the method. 

157 

158 Raises: 

159 SlackApiError: If an error occurs while calling the Slack SDK method 

160 

161 Returns: 

162 List[Dict]: The result from running the Slack SDK method. 

163 """ 

164 web_connection = self.connect() 

165 method = getattr(web_connection, method_name) 

166 

167 items = [] 

168 try: 

169 response = method(**params) 

170 response_data = deepcopy(response.data) 

171 

172 # Get only the data items from the response data. 

173 items.extend(self._extract_data_from_response(response_data)) 

174 

175 # If the response contains a cursor, fetch the next page of results. 

176 if 'response_metadata' in response and 'next_cursor' in response['response_metadata']: 

177 while response['response_metadata']['next_cursor']: 

178 response = method(cursor=response['response_metadata']['next_cursor'], **params) 

179 response_data = deepcopy(response.data) 

180 

181 # Get only the data items from the response data. 

182 items.extend(self._extract_data_from_response(response_data)) 

183 except SlackApiError as slack_error: 

184 error = f"Error calling method '{method_name}' with params '{params}': {slack_error.response['error']}" 

185 logger.error(error) 

186 raise 

187 

188 if items: 188 ↛ 191line 188 didn't jump to line 191 because the condition on line 188 was always true

189 df = pd.DataFrame(items) 

190 

191 return df 

192 

193 def _extract_data_from_response(self, response_data: Dict) -> List[Dict]: 

194 """ 

195 Extracts the data items from the response object. 

196 

197 Args: 

198 response_data (Dict): The response object containing the data items. 

199 

200 Raises: 

201 ValueError: If the response data could not be parsed. 

202 

203 Returns: 

204 List[Dict]: The data items extracted from the response object. 

205 """ 

206 # Remove the metadata from the response. 

207 for key in ['ok', 'response_metadata', 'cache_ts', 'latest', 'pin_count', 'has_more']: 

208 if key in response_data: 

209 response_data.pop(key) 

210 

211 # If the response contains only one key, return the value of that key as a list. 

212 if len(response_data) == 1: 

213 key = list(response_data.keys())[0] 

214 if isinstance(response_data[key], list): 

215 return response_data[key] 

216 

217 else: 

218 return [response_data[key]] 

219 

220 # Otherwise, raise an error. 

221 raise ValueError('Response data could not be parsed.') 

222 

223 def get_chat_config(self) -> Dict: 

224 """ 

225 Returns the chat configuration for the Slack handler. 

226 

227 Returns: 

228 Dict: The chat configuration. 

229 """ 

230 return { 

231 'polling': { 

232 'type': 'realtime', 

233 }, 

234 'memory': { 

235 'type': 'handler', 

236 }, 

237 'tables': [ 

238 { 

239 'chat_table': { 

240 'name': 'messages', 

241 'chat_id_col': 'channel_id', 

242 'username_col': 'user', 

243 'text_col': 'text', 

244 'time_col': 'created_at', 

245 } 

246 }, 

247 { 

248 'chat_table': { 

249 'name': 'threads', 

250 'chat_id_col': ['channel_id', 'thread_ts'], 

251 'username_col': 'user', 

252 'text_col': 'text', 

253 'time_col': 'thread_ts', 

254 } 

255 } 

256 ] 

257 } 

258 

259 def get_my_user_name(self) -> Text: 

260 """ 

261 Gets the name of the bot user. 

262 

263 Returns: 

264 Text: The name of the bot user. 

265 """ 

266 web_connection = self.connect() 

267 user_info = web_connection.auth_test().data 

268 return user_info['bot_id'] 

269 

270 def subscribe(self, stop_event: threading.Event, callback: Callable, table_name: Text = 'messages', 

271 columns: List = None, **kwargs: Any) -> None: 

272 """ 

273 Subscribes to the Slack API using the Socket Mode for real-time responses to messages. 

274 

275 Args: 

276 stop_event (threading.Event): The event to stop the subscription. 

277 callback (Callable): The callback function to process the messages. 

278 table_name (Text): The name of the table to subscribe to. 

279 kwargs: Arbitrary keyword arguments. 

280 """ 

281 if table_name not in ['messages', 'threads']: 

282 raise RuntimeError(f'Table {table_name} is not supported for subscription.') 

283 

284 # Raise an error if columns are provided. 

285 # Since Slack subscriptions depend on events and not changes to the virtual tables, columns are not supported. 

286 if columns: 

287 raise RuntimeError('Columns are not supported for Slack subscriptions.') 

288 

289 self._socket_connection = SocketModeClient( 

290 # This app-level token will be used only for establishing a connection. 

291 app_token=self.connection_data['app_token'], # xapp-A111-222-xyz 

292 # The WebClient for performing Web API calls in listeners. 

293 web_client=WebClient(token=self.connection_data['token']), # xoxb-111-222-xyz 

294 ) 

295 

296 def _process_websocket_message(client: SocketModeClient, request: SocketModeRequest) -> None: 

297 """ 

298 Pre-processes the incoming WebSocket message from the Slack API and calls the callback function to process the message. 

299 

300 Args: 

301 client (SocketModeClient): The client object to send the response. 

302 request (SocketModeRequest): The request object containing the payload. 

303 """ 

304 # Acknowledge the request. 

305 response = SocketModeResponse(envelope_id=request.envelope_id) 

306 client.send_socket_mode_response(response) 

307 

308 # Ignore requests that are not events. 

309 if request.type != 'events_api': 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 return 

311 

312 # Ignore duplicate requests. 

313 if request.retry_attempt is not None and request.retry_attempt > 0: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 return 

315 

316 payload_event = request.payload['event'] 

317 # Avoid responding to events other than direct messages and app mentions. 

318 if payload_event['type'] not in ('message', 'app_mention'): 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true

319 return 

320 

321 # Avoid responding to unrelated events like message_changed, message_deleted, etc. 

322 if 'subtype' in payload_event: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true

323 return 

324 

325 # Avoid responding to messages from the bot. 

326 if 'bot_id' in payload_event: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true

327 return 

328 

329 key = { 

330 'channel_id': payload_event['channel'], 

331 } 

332 

333 row = { 

334 'text': payload_event['text'], 

335 'user': payload_event['user'], 

336 'created_at': dt.datetime.fromtimestamp(float(payload_event['ts'])).strftime('%Y-%m-%d %H:%M:%S') 

337 } 

338 

339 # Add thread_ts to the key and row if it is a thread message. This is used to identify threads. 

340 # This message should be handled via the threads table. 

341 if 'thread_ts' in payload_event: 341 ↛ 342line 341 didn't jump to line 342 because the condition on line 341 was never true

342 key['thread_ts'] = payload_event['thread_ts'] 

343 

344 callback(row, key) 

345 

346 self._socket_connection.socket_mode_request_listeners.append(_process_websocket_message) 

347 self._socket_connection.connect() 

348 

349 stop_event.wait() 

350 

351 self._socket_connection.close()