Coverage for mindsdb / integrations / handlers / slack_handler / slack_handler.py: 88%
136 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from copy import deepcopy
2import datetime as dt
3import os
4import threading
5from typing import Any, Callable, Dict, List, Text
7import pandas as pd
8from slack_sdk import WebClient
9from slack_sdk.errors import SlackApiError
10from slack_sdk.socket_mode import SocketModeClient
11from slack_sdk.socket_mode.response import SocketModeResponse
12from slack_sdk.socket_mode.request import SocketModeRequest
14from mindsdb.integrations.handlers.slack_handler.slack_tables import (
15 SlackConversationsTable,
16 SlackMessagesTable,
17 SlackThreadsTable,
18 SlackUsersTable
19)
20from mindsdb.integrations.libs.api_handler import APIChatHandler, FuncParser
21from mindsdb.integrations.libs.response import (
22 HandlerResponse as Response,
23 HandlerStatusResponse as StatusResponse,
24 RESPONSE_TYPE
25)
26from mindsdb.utilities.config import Config
27from mindsdb.utilities import log
29logger = log.getLogger(__name__)
32class SlackHandler(APIChatHandler):
33 """
34 This handler handles the connection and execution of SQL statements on Slack.
35 Additionally, it allows the setup of a real-time connection to the Slack API using the Socket Mode for chat-bots.
36 """
38 def __init__(self, name: Text, connection_data: Dict, **kwargs: Any) -> None:
39 """
40 Initializes the handler.
42 Args:
43 name (Text): The name of the handler instance.
44 connection_data (Dict): The connection data required to connect to the SAP HANA database.
45 kwargs(Any): Arbitrary keyword arguments.
46 """
47 super().__init__(name)
48 self.connection_data = connection_data
49 self.kwargs = kwargs
51 # If the parameters are not provided, check the environment variables and the handler configuration.
52 handler_config = Config().get('slack_handler', {})
54 for key in ['token', 'app_token']:
55 if key not in self.connection_data:
56 if f'SLACK_{key.upper()}' in os.environ: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true
57 self.connection_data[key] = os.environ[f'SLACK_{key.upper()}']
58 elif key in handler_config: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true
59 self.connection_data[key] = handler_config[key]
61 self.web_connection = None
62 self._socket_connection = None
63 self.is_connected = False
65 self._register_table('conversations', SlackConversationsTable(self))
66 self._register_table('messages', SlackMessagesTable(self))
67 self._register_table('threads', SlackThreadsTable(self))
68 self._register_table('users', SlackUsersTable(self))
70 def connect(self) -> WebClient:
71 """
72 Establishes a connection to the Slack API using the WebClient.
74 Returns:
75 WebClient: The WebClient object to interact with the Slack API.
76 """
77 if self.is_connected is True:
78 return self.web_connection
80 # Check if the mandatory connection parameter (token) is available.
81 if 'token' not in self.connection_data:
82 raise ValueError('Required parameter (token) must be provided.')
84 try:
85 self.web_connection = WebClient(token=self.connection_data['token'])
86 self.is_connected = True
87 return self.web_connection
88 except Exception as unknown_error:
89 logger.error(f'Unknown error connecting to Slack API: {unknown_error}')
90 raise
92 def check_connection(self) -> StatusResponse:
93 """
94 Checks the status of the connection to the Slack API, both for the WebClient and the Socket Mode.
96 Raises:
97 SlackApiError: If an error occurs while connecting to the Slack API.
99 Returns:
100 StatusResponse: An object containing the success status and an error message if an error occurs.
101 """
102 response = StatusResponse(False)
104 try:
105 web_connection = self.connect()
106 # Check the status of the web connection.
107 web_connection.auth_test()
109 # Check the status of the socket connection if the app_token is provided.
110 if 'app_token' in self.connection_data: 110 ↛ 118line 110 didn't jump to line 118 because the condition on line 110 was always true
111 _socket_connection = SocketModeClient(
112 app_token=self.connection_data['app_token'],
113 web_client=web_connection
114 )
115 _socket_connection.connect()
116 _socket_connection.disconnect()
118 response.success = True
119 except (SlackApiError, ValueError) as known_error:
120 logger.error(f'Connection check to the Slack API failed, {known_error}!')
121 response.error_message = str(known_error)
122 except Exception as unknown_error:
123 logger.error(f'Connection check to the Slack API failed due to an unknown error, {unknown_error}!')
124 response.error_message = str(unknown_error)
126 if response.success is False and self.is_connected is True:
127 self.is_connected = False
129 return response
131 def native_query(self, query: Text = None) -> Response:
132 """
133 Executes native Slack SDK methods as specified in the query string.
135 Args:
136 query: The query string containing the method name and parameters.
138 Returns:
139 Response: A response object containing the result of the query.
140 """
141 method_name, params = FuncParser().from_string(query)
143 df = self._call_slack_api(method_name, params)
145 return Response(
146 RESPONSE_TYPE.TABLE,
147 data_frame=df
148 )
150 def _call_slack_api(self, method_name: Text = None, params: Dict = None) -> List[Dict]:
151 """
152 Calls the Slack SDK method with the specified method name and parameters.
154 Args:
155 method_name (Text): The name of the method to call.
156 params (Dict): The parameters to pass to the method.
158 Raises:
159 SlackApiError: If an error occurs while calling the Slack SDK method
161 Returns:
162 List[Dict]: The result from running the Slack SDK method.
163 """
164 web_connection = self.connect()
165 method = getattr(web_connection, method_name)
167 items = []
168 try:
169 response = method(**params)
170 response_data = deepcopy(response.data)
172 # Get only the data items from the response data.
173 items.extend(self._extract_data_from_response(response_data))
175 # If the response contains a cursor, fetch the next page of results.
176 if 'response_metadata' in response and 'next_cursor' in response['response_metadata']:
177 while response['response_metadata']['next_cursor']:
178 response = method(cursor=response['response_metadata']['next_cursor'], **params)
179 response_data = deepcopy(response.data)
181 # Get only the data items from the response data.
182 items.extend(self._extract_data_from_response(response_data))
183 except SlackApiError as slack_error:
184 error = f"Error calling method '{method_name}' with params '{params}': {slack_error.response['error']}"
185 logger.error(error)
186 raise
188 if items: 188 ↛ 191line 188 didn't jump to line 191 because the condition on line 188 was always true
189 df = pd.DataFrame(items)
191 return df
193 def _extract_data_from_response(self, response_data: Dict) -> List[Dict]:
194 """
195 Extracts the data items from the response object.
197 Args:
198 response_data (Dict): The response object containing the data items.
200 Raises:
201 ValueError: If the response data could not be parsed.
203 Returns:
204 List[Dict]: The data items extracted from the response object.
205 """
206 # Remove the metadata from the response.
207 for key in ['ok', 'response_metadata', 'cache_ts', 'latest', 'pin_count', 'has_more']:
208 if key in response_data:
209 response_data.pop(key)
211 # If the response contains only one key, return the value of that key as a list.
212 if len(response_data) == 1:
213 key = list(response_data.keys())[0]
214 if isinstance(response_data[key], list):
215 return response_data[key]
217 else:
218 return [response_data[key]]
220 # Otherwise, raise an error.
221 raise ValueError('Response data could not be parsed.')
223 def get_chat_config(self) -> Dict:
224 """
225 Returns the chat configuration for the Slack handler.
227 Returns:
228 Dict: The chat configuration.
229 """
230 return {
231 'polling': {
232 'type': 'realtime',
233 },
234 'memory': {
235 'type': 'handler',
236 },
237 'tables': [
238 {
239 'chat_table': {
240 'name': 'messages',
241 'chat_id_col': 'channel_id',
242 'username_col': 'user',
243 'text_col': 'text',
244 'time_col': 'created_at',
245 }
246 },
247 {
248 'chat_table': {
249 'name': 'threads',
250 'chat_id_col': ['channel_id', 'thread_ts'],
251 'username_col': 'user',
252 'text_col': 'text',
253 'time_col': 'thread_ts',
254 }
255 }
256 ]
257 }
259 def get_my_user_name(self) -> Text:
260 """
261 Gets the name of the bot user.
263 Returns:
264 Text: The name of the bot user.
265 """
266 web_connection = self.connect()
267 user_info = web_connection.auth_test().data
268 return user_info['bot_id']
270 def subscribe(self, stop_event: threading.Event, callback: Callable, table_name: Text = 'messages',
271 columns: List = None, **kwargs: Any) -> None:
272 """
273 Subscribes to the Slack API using the Socket Mode for real-time responses to messages.
275 Args:
276 stop_event (threading.Event): The event to stop the subscription.
277 callback (Callable): The callback function to process the messages.
278 table_name (Text): The name of the table to subscribe to.
279 kwargs: Arbitrary keyword arguments.
280 """
281 if table_name not in ['messages', 'threads']:
282 raise RuntimeError(f'Table {table_name} is not supported for subscription.')
284 # Raise an error if columns are provided.
285 # Since Slack subscriptions depend on events and not changes to the virtual tables, columns are not supported.
286 if columns:
287 raise RuntimeError('Columns are not supported for Slack subscriptions.')
289 self._socket_connection = SocketModeClient(
290 # This app-level token will be used only for establishing a connection.
291 app_token=self.connection_data['app_token'], # xapp-A111-222-xyz
292 # The WebClient for performing Web API calls in listeners.
293 web_client=WebClient(token=self.connection_data['token']), # xoxb-111-222-xyz
294 )
296 def _process_websocket_message(client: SocketModeClient, request: SocketModeRequest) -> None:
297 """
298 Pre-processes the incoming WebSocket message from the Slack API and calls the callback function to process the message.
300 Args:
301 client (SocketModeClient): The client object to send the response.
302 request (SocketModeRequest): The request object containing the payload.
303 """
304 # Acknowledge the request.
305 response = SocketModeResponse(envelope_id=request.envelope_id)
306 client.send_socket_mode_response(response)
308 # Ignore requests that are not events.
309 if request.type != 'events_api': 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 return
312 # Ignore duplicate requests.
313 if request.retry_attempt is not None and request.retry_attempt > 0: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 return
316 payload_event = request.payload['event']
317 # Avoid responding to events other than direct messages and app mentions.
318 if payload_event['type'] not in ('message', 'app_mention'): 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 return
321 # Avoid responding to unrelated events like message_changed, message_deleted, etc.
322 if 'subtype' in payload_event: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true
323 return
325 # Avoid responding to messages from the bot.
326 if 'bot_id' in payload_event: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true
327 return
329 key = {
330 'channel_id': payload_event['channel'],
331 }
333 row = {
334 'text': payload_event['text'],
335 'user': payload_event['user'],
336 'created_at': dt.datetime.fromtimestamp(float(payload_event['ts'])).strftime('%Y-%m-%d %H:%M:%S')
337 }
339 # Add thread_ts to the key and row if it is a thread message. This is used to identify threads.
340 # This message should be handled via the threads table.
341 if 'thread_ts' in payload_event: 341 ↛ 342line 341 didn't jump to line 342 because the condition on line 341 was never true
342 key['thread_ts'] = payload_event['thread_ts']
344 callback(row, key)
346 self._socket_connection.socket_mode_request_listeners.append(_process_websocket_message)
347 self._socket_connection.connect()
349 stop_event.wait()
351 self._socket_connection.close()