Coverage for mindsdb / integrations / handlers / slack_handler / slack_tables.py: 76%
270 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import datetime as dt
2from typing import Any, Dict, List, Text
4from mindsdb_sql_parser.ast import Delete, Insert, Update
5import pandas as pd
6from slack_sdk.errors import SlackApiError
8from mindsdb.integrations.libs.api_handler import APIResource
9from mindsdb.integrations.utilities.sql_utils import (
10 extract_comparison_conditions,
11 FilterCondition,
12 FilterOperator,
13 SortColumn,
14)
15from mindsdb.utilities import log
17logger = log.getLogger(__name__)
20class SlackConversationsTable(APIResource):
21 """
22 This is the table abstraction for interacting with conversations via the Slack API.
23 """
25 def list(self, conditions: List[FilterCondition] = None, limit: int = None, **kwargs: Any) -> pd.DataFrame:
26 """
27 Retrieves a list of Slack conversations based on the specified conditions.
28 If no channel ID(s) are provided, all channels are retrieved as follows:
29 - If the provided limit is greater than 1000, no limit to the API call is provided and the results are paginated until the limit is reached.
30 - Otherwise, the provided limit or a default limit of 1000 is used when making the API call.
32 Therefore, if a user is to retrieve more than 1000 channels, the limit should be set to a value greater than 1000.
34 The above is designed to prevent rate limiting by the Slack API.
36 Args:
37 conditions (List[FilterCondition]): The conditions to filter the conversations.
38 limit (int): The limit of the conversations to return.
39 kwargs(Any): Arbitrary keyword arguments.
41 Raises:
42 ValueError:
43 - If an unsupported operator is used for the column 'id'.
44 - If the channel ID(s) provided are not found.
45 SlackApiError: If an error occurs when getting the channels from the Slack API.
47 Returns:
48 pd.DataFrame: The list of conversations.
49 """
50 channels = []
51 for condition in conditions:
52 value = condition.value
53 op = condition.op
55 # Handle the column 'id'.
56 if condition.column == "id": 56 ↛ 51line 56 didn't jump to line 51 because the condition on line 56 was always true
57 if op not in [FilterOperator.EQUAL, FilterOperator.IN]: 57 ↛ 58line 57 didn't jump to line 58 because the condition on line 57 was never true
58 raise ValueError(f"Unsupported operator '{op}' for column 'id'")
60 if op == FilterOperator.EQUAL:
61 try:
62 channels = [self.get_channel(value)]
63 condition.applied = True
64 except ValueError:
65 raise
67 if op == FilterOperator.IN:
68 try:
69 channels = self._get_channels(value if isinstance(value, list) else [value])
70 condition.applied = True
71 except ValueError:
72 raise
74 # If no channel ID(s) are provided, get all channels with the specified limit.
75 if not channels:
76 channels = self._get_all_channels(limit)
78 for channel in channels:
79 channel["created_at"] = dt.datetime.fromtimestamp(channel["created"])
80 channel["updated_at"] = dt.datetime.fromtimestamp(channel["updated"] / 1000)
82 return pd.DataFrame(channels, columns=self.get_columns())
84 def get_channel(self, channel_id: Text) -> Dict:
85 """
86 Gets the channel data for the specified channel id.
88 Args:
89 channel_id (Text): The channel id.
91 Raises:
92 ValueError: If the channel ID is not found.
94 Returns:
95 Dict: The channel data.
96 """
97 client = self.handler.connect()
99 try:
100 response = client.conversations_info(channel=channel_id)
101 except SlackApiError as slack_error:
102 logger.error(f"Error getting channel '{channel_id}': {slack_error.response['error']}")
103 raise ValueError(f"Channel '{channel_id}' not found")
105 return response["channel"]
107 def _get_channels(self, channel_ids: List[Text]) -> List[Dict]:
108 """
109 Gets the channel data for multiple channel ids.
110 As it is unlikely that a large number of channels will be provided, the API rate limits are ignored here.
112 Args:
113 channel_ids (List[Text]): The list of channel ids.
115 Raises:
116 ValueError: If a channel ID is not found.
118 Returns:
119 List[Dict]: The list of channel data.
120 """
121 channels = []
122 for channel_id in channel_ids:
123 channel = self.get_channel(channel_id)
124 channels.append(channel)
126 return channels
128 def _get_all_channels(self, limit: int = None) -> List[Dict]:
129 """
130 Gets the list of channels with a limit.
131 If the provided limit is greater than 1000, no limit to the API call is provided and the results are paginated until the limit is reached.
132 Otherwise, the provided limit or a default limit of 1000 is used when making the API call.
134 Args:
135 limit (int): The limit of channels to return.
137 Raises:
138 SlackApiError: If an error occurs when getting the channels from the Slack API.
140 Returns:
141 List[Dict]: The list of channels.
142 """
143 client = self.handler.connect()
145 try:
146 # If the limit is greater than 1000, paginate the results until the limit is reached.
147 if limit and limit > 1000:
148 response = client.conversations_list()
149 channels = response["channels"]
151 # Paginate the results until the limit is reached.
152 while response["response_metadata"]["next_cursor"]:
153 response = client.conversations_list(cursor=response["response_metadata"]["next_cursor"])
154 channels.extend(response["channels"])
155 if len(channels) >= limit: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 break
158 channels = channels[:limit]
159 # Otherwise, use the provided limit or a default limit of 1000.
160 else:
161 response = client.conversations_list(limit=limit if limit else 1000)
162 channels = response["channels"]
163 except SlackApiError as slack_error:
164 logger.error(f"Error getting channels: {slack_error.response['error']}")
165 raise
167 return channels
169 def get_columns(self) -> List[str]:
170 """
171 Retrieves the attributes (columns) of the Slack conversations.
173 Returns:
174 List[str]: The list of columns.
175 """
176 return [
177 "id",
178 "name",
179 "is_channel",
180 "is_group",
181 "is_im",
182 "is_mpim",
183 "is_private",
184 "is_archived",
185 "is_general",
186 "is_shared",
187 "is_ext_shared",
188 "is_org_shared",
189 "creator",
190 "created_at",
191 "updated_at",
192 ]
195class SlackMessagesTable(APIResource):
196 """
197 This is the table abstraction for interacting with messages via the Slack API.
198 """
200 def list(
201 self, conditions: List[FilterCondition] = None, limit: int = None, sort: List[SortColumn] = None, **kwargs: Any
202 ) -> pd.DataFrame:
203 """
204 Retrieves a list of messages from Slack conversations based on the specified conditions.
206 `channel_id` is a required parameter to retrieve messages from a conversation.
208 Messages are retrieved as follows for a given conversation:
209 - If the provided limit is greater than 999, no limit to the API call is provided and the results are paginated until the limit is reached.
210 - Otherwise, the provided limit or a default limit of 999 is used when making the API call.
212 Therefore, if a user is to retrieve more than 999 messages, the limit should be set to a value greater than 999.
213 The above is dependent on the other parameters provided in the conditions.
215 The above is designed to prevent rate limiting by the Slack API.
217 Args:
218 conditions (List[FilterCondition]): The conditions to filter the messages.
219 limit (int): The limit of the messages to return.
220 sort (List[SortColumn]): The columns to sort the messages by.
221 kwargs (Any): Arbitrary keyword arguments.
223 Raises:
224 ValueError:
225 - If the 'channel_id' parameter is not provided.
226 - If an unsupported operator is used for the column 'channel_id'.
227 - If the channel ID provided is not found.
228 SlackApiError: If an error occurs when getting the messages from the Slack API.
230 Returns:
231 pd.DataFrame: The list of messages.
232 """
233 client = self.handler.connect()
235 # Build the parameters for the call to the Slack API.
236 params = {}
237 for condition in conditions:
238 value = condition.value
239 op = condition.op
241 # Handle the column 'channel_id'.
242 if condition.column == "channel_id": 242 ↛ 255line 242 didn't jump to line 255 because the condition on line 242 was always true
243 if op != FilterOperator.EQUAL:
244 raise ValueError(f"Unsupported operator '{op}' for column 'channel_id'")
246 # Check if the provided channel exists.
247 try:
248 channel = SlackConversationsTable(self.handler).get_channel(value)
249 params["channel"] = value
250 condition.applied = True
251 except SlackApiError:
252 raise ValueError(f"Channel '{value}' not found")
254 # Handle the column 'created_at'.
255 elif condition.column == "created_at" and value is not None:
256 date = dt.datetime.fromisoformat(value).replace(tzinfo=dt.timezone.utc)
257 if op == FilterOperator.GREATER_THAN:
258 params["oldest"] = date.timestamp() + 1
259 elif op == FilterOperator.GREATER_THAN_OR_EQUAL:
260 params["oldest"] = date.timestamp()
261 elif op == FilterOperator.LESS_THAN_OR_EQUAL:
262 params["latest"] = date.timestamp()
263 else:
264 continue
265 condition.applied = True
267 if "channel" not in params:
268 raise ValueError("To retrieve data from Slack, you need to provide the 'channel_id' parameter.")
270 # Retrieve the messages from the Slack API.
271 try:
272 # If the limit is greater than 999, paginate the results until the limit is reached.
273 if limit and limit > 999:
274 params["limit"] = 999
275 response = client.conversations_history(**params)
276 messages = response["messages"]
278 # Paginate the results until the limit is reached. response_metadata may be None.
279 while response.get("response_metadata", {}).get("next_cursor"):
280 response = client.conversations_history(
281 cursor=response["response_metadata"]["next_cursor"], **params
282 )
283 messages.extend(response["messages"])
284 if len(messages) >= limit: 284 ↛ 285line 284 didn't jump to line 285 because the condition on line 284 was never true
285 break
287 messages = messages[:limit]
288 # Otherwise, use the provided limit or a default limit of 999.
289 else:
290 params["limit"] = limit if limit else 999
291 response = client.conversations_history(**params)
292 messages = response["messages"]
293 except SlackApiError as slack_error:
294 logger.error(f"Error getting messages: {slack_error.response['error']}")
295 raise
297 result = pd.DataFrame(messages, columns=self.get_columns())
299 result = result[result["text"].notnull()]
301 # Add the channel ID and name to the result.
302 result["channel_id"] = params["channel"]
303 result["channel_name"] = channel["name"] if "name" in channel else None
305 # Translate the time stamp into a 'created_at' field.
306 result["created_at"] = pd.to_datetime(result["ts"].astype(float), unit="s").dt.strftime("%Y-%m-%d %H:%M:%S")
308 # Sort the messages by the specified columns.
309 if sort: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 result.sort_values(by=[col.column for col in sort], ascending=[col.ascending for col in sort], inplace=True)
312 return result
314 def insert(self, query: Insert):
315 """
316 Executes an INSERT SQL query represented by an ASTNode object and posts a message to a Slack channel.
318 Args:
319 query (Insert): An ASTNode object representing the SQL query to be executed.
321 Raises:
322 ValueError: If the 'channel_id' or 'text' parameters are not provided.
323 SlackApiError: If an error occurs when posting the message to the Slack channel.
324 """
325 client = self.handler.connect()
327 # Get column names and values from the query.
328 columns = [col.name for col in query.columns]
329 for row in query.values:
330 params = dict(zip(columns, row))
332 # Check if required parameters are provided.
333 if "channel_id" not in params or "text" not in params:
334 raise ValueError(
335 "To insert data into Slack, you need to provide the 'channel_id' and 'text' parameters."
336 )
338 try:
339 client.chat_postMessage(channel=params["channel_id"], text=params["text"])
340 except SlackApiError as slack_error:
341 logger.error(
342 f"Error posting message to Slack channel '{params['channel_id']}': {slack_error.response['error']}"
343 )
344 raise
346 def update(self, query: Update):
347 """
348 Executes an UPDATE SQL query represented by an ASTNode object and updates a message in a Slack channel.
350 Args:
351 query (Update): An ASTNode object representing the SQL query to be executed.
353 Raises:
354 ValueError:
355 - If the 'channel_id', 'ts', or 'text' parameters are not provided.
356 - If an unsupported operator is used for the columns.
357 - If an unsupported column is used.
358 - If the channel ID provided is not found.
359 SlackApiError: If an error occurs when updating the message in the Slack channel.
360 """
361 client = self.handler.connect()
363 conditions = extract_comparison_conditions(query.where)
365 # Build the parameters for the call to the Slack API.
366 params = {}
367 # Extract the parameters from the conditions.
368 for op, arg1, arg2 in conditions:
369 # Handle the column 'channel_id'.
370 if arg1 == "channel_id":
371 # Check if the provided channel exists.
372 try:
373 SlackConversationsTable(self.handler).get_channel(arg2)
374 params["channel"] = arg2
375 except SlackApiError as slack_error:
376 logger.error(f"Error getting channel '{arg2}': {slack_error.response['error']}")
377 raise ValueError(f"Channel '{arg2}' not found")
379 # Handle the column'ts'.
380 elif arg1 == "ts": 380 ↛ 387line 380 didn't jump to line 387 because the condition on line 380 was always true
381 if op == "=": 381 ↛ 384line 381 didn't jump to line 384 because the condition on line 381 was always true
382 params[arg1] = str(arg2)
383 else:
384 raise ValueError(f"Unsupported operator '{op}' for column '{arg1}'")
386 else:
387 raise ValueError(f"Unsupported column '{arg1}'")
389 # Extract the update columns and values.
390 for col, val in query.update_columns.items():
391 if col == "text": 391 ↛ 394line 391 didn't jump to line 394 because the condition on line 391 was always true
392 params[col] = str(val).strip("'")
393 else:
394 raise ValueError(f"Unsupported column '{col}'")
396 # Check if required parameters are provided.
397 if "channel" not in params or "ts" not in params or "text" not in params:
398 raise ValueError(
399 "To update a message in Slack, you need to provide the 'channel', 'ts', and 'text' parameters."
400 )
402 try:
403 client.chat_update(channel=params["channel"], ts=params["ts"], text=params["text"].strip())
404 except SlackApiError as slack_error:
405 logger.error(
406 f"Error updating message in Slack channel '{params['channel']}' with timestamp '{params['ts']}': {slack_error.response['error']}"
407 )
408 raise
410 def delete(self, query: Delete):
411 """
412 Executes a DELETE SQL query represented by an ASTNode object and deletes a message from a Slack channel.
414 Args:
415 query (Delete): An ASTNode object representing the SQL query to be executed.
417 Raises:
418 ValueError:
419 - If the 'channel_id' or 'ts' parameters are not provided.
420 - If an unsupported operator is used for the columns.
421 - If an unsupported column is used.
422 - If the channel ID provided is not found.
423 SlackApiError: If an error occurs when deleting the message from the Slack channel.
424 """
425 client = self.handler.connect()
427 conditions = extract_comparison_conditions(query.where)
429 # Build the parameters for the call to the Slack API.
430 params = {}
431 for op, arg1, arg2 in conditions:
432 # Handle the column 'channel_id'.
433 if arg1 == "channel_id":
434 # Check if the provided channel exists.
435 try:
436 SlackConversationsTable(self.handler).get_channel(arg2)
437 params["channel"] = arg2
438 except SlackApiError as slack_error:
439 logger.error(f"Error getting channel '{arg2}': {slack_error.response['error']}")
440 raise ValueError(f"Channel '{arg2}' not found")
442 # Handle the columns 'ts'.
443 elif arg1 == "ts": 443 ↛ 450line 443 didn't jump to line 450 because the condition on line 443 was always true
444 if op == "=": 444 ↛ 447line 444 didn't jump to line 447 because the condition on line 444 was always true
445 params["ts"] = float(arg2)
446 else:
447 raise NotImplementedError(f"Unknown op: {op}")
449 else:
450 raise ValueError(f"Unsupported column '{arg1}'")
452 # Check if required parameters are provided.
453 if "channel" not in params or "ts" not in params:
454 raise ValueError("To delete a message from Slack, you need to provide the 'channel' and 'ts' parameters.")
456 try:
457 client.chat_delete(channel=params["channel"], ts=params["ts"])
459 except SlackApiError as slack_error:
460 logger.error(
461 f"Error deleting message in Slack channel '{params['channel']}' with timestamp '{params['ts']}': {slack_error.response['error']}"
462 )
463 raise
465 def get_columns(self) -> List[Text]:
466 """
467 Retrieves the attributes (columns) of the Slack messages.
469 Returns:
470 List[str]: The list of columns.
471 """
472 return [
473 "channel_id",
474 "channel",
475 "client_msg_id",
476 "type",
477 "subtype",
478 "ts",
479 "created_at",
480 "user",
481 "text",
482 "attachments",
483 "files",
484 "reactions",
485 "thread_ts",
486 "reply_count",
487 "reply_users_count",
488 "latest_reply",
489 "reply_users",
490 ]
493class SlackThreadsTable(APIResource):
494 """
495 This is the table abstraction for interacting with threads in Slack conversations.
496 """
498 def list(
499 self, conditions: List[FilterCondition] = None, limit: int = None, sort: List[SortColumn] = None, **kwargs: Any
500 ) -> pd.DataFrame:
501 """
502 Retrieves a list of messages in a thread based on the specified conditions.
504 `channel_id` and `thread_ts` are required parameters to retrieve messages from a thread.
506 Messages are retrieved as follows for a given thread:
507 - If the provided limit is greater than 1000, no limit to the API call is provided and the results are paginated until the limit is reached.
508 - Otherwise, the provided limit or a default limit of 1000 is used when making the API call.
510 Therefore, if a user is to retrieve more than 1000 messages, the limit should be set to a value greater than 1000.
512 The above is designed to prevent rate limiting by the Slack API.
514 Args:
515 conditions (List[FilterCondition]): The conditions to filter the messages.
516 limit (int): The limit of the messages to return.
517 sort (List[SortColumn]): The columns to sort the messages by.
518 kwargs (Any): Arbitrary keyword arguments.
520 Raises:
521 ValueError:
522 - If the 'channel_id' or 'thread_ts' parameters are not provided.
523 - If an unsupported operator is used for the columns.
524 - If an unsupported column is used.
525 - If the channel ID provided is not found.
526 SlackApiError: If an error occurs when getting the messages from the Slack API.
528 Returns:
529 pd.DataFrame: The messages in the thread.
530 """
531 client = self.handler.connect()
533 # Build the parameters for the call to the Slack API.
534 params = {}
535 for condition in conditions:
536 value = condition.value
537 op = condition.op
539 # Handle the column 'channel_id'.
540 if condition.column == "channel_id":
541 if op != FilterOperator.EQUAL:
542 raise ValueError(f"Unsupported operator '{op}' for column 'channel_id'")
544 # Check if the provided channel exists.
545 try:
546 channel = SlackConversationsTable(self.handler).get_channel(value)
547 params["channel"] = value
548 condition.applied = True
549 except SlackApiError as slack_error:
550 logger.error(f"Error getting channel '{value}': {slack_error.response['error']}")
551 raise ValueError(f"Channel '{value}' not found")
553 # Handle the column 'thread_ts'.
554 elif condition.column == "thread_ts": 554 ↛ 535line 554 didn't jump to line 535 because the condition on line 554 was always true
555 if op != FilterOperator.EQUAL: 555 ↛ 556line 555 didn't jump to line 556 because the condition on line 555 was never true
556 raise ValueError(f"Unsupported operator '{op}' for column 'thread_ts'")
558 params["ts"] = value
560 if "channel" not in params or "ts" not in params:
561 raise ValueError(
562 "To retrieve data from Slack, you need to provide the 'channel_id' and 'thread_ts' parameters."
563 )
565 # Retrieve the messages from the Slack API.
566 try:
567 # If the limit is greater than 1000, paginate the results until the limit is reached.
568 if limit and limit > 1000:
569 response = client.conversations_replies(**params)
570 messages = response["messages"]
572 # Paginate the results until the limit is reached.
573 while response["response_metadata"]["next_cursor"]:
574 response = client.conversations_replies(cursor=response["response_metadata"]["next_cursor"])
575 messages.extend(response["messages"])
576 if len(messages) >= limit: 576 ↛ 577line 576 didn't jump to line 577 because the condition on line 576 was never true
577 break
579 messages = messages[:limit]
580 # Otherwise, use the provided limit or a default limit of 1000.
581 else:
582 params["limit"] = limit if limit else 1000
583 response = client.conversations_replies(**params)
584 messages = response["messages"]
585 except SlackApiError as slack_error:
586 logger.error(f"Error getting messages: {slack_error.response['error']}")
587 raise
589 result = pd.DataFrame(messages, columns=self.get_columns())
591 result = result[result["text"].notnull()]
593 # Add the channel ID and name to the result.
594 result["channel_id"] = params["channel"]
595 result["channel_name"] = channel["name"] if "name" in channel else None
597 # Sort the messages by the specified columns.
598 if sort: 598 ↛ 599line 598 didn't jump to line 599 because the condition on line 598 was never true
599 result.sort_values(by=[col.column for col in sort], ascending=[col.ascending for col in sort], inplace=True)
601 return result
603 def insert(self, query: Insert):
604 """
605 Executes an INSERT SQL query represented by an ASTNode object and posts a message to a Slack thread.
607 Args:
608 query (Insert): An ASTNode object representing the SQL query to be executed.
610 Raises:
611 ValueError: If the 'channel_id', 'text', or 'thread_ts' parameters are not provided.
612 """
613 client = self.handler.connect()
615 # Get column names and values from the query.
616 columns = [col.name for col in query.columns]
617 for row in query.values:
618 params = dict(zip(columns, row))
620 # Check if required parameters are provided.
621 if "channel_id" not in params or "text" not in params or "thread_ts" not in params:
622 raise ValueError(
623 "To insert data into Slack, you need to provide the 'channel_id', 'text', and 'thread_ts' parameters."
624 )
626 try:
627 client.chat_postMessage(
628 channel=params["channel_id"], text=params["text"], thread_ts=params["thread_ts"]
629 )
630 except SlackApiError as slack_error:
631 logger.error(
632 f"Error posting message to Slack channel '{params['channel_id']}': {slack_error.response['error']}"
633 )
634 raise
636 def get_columns(self) -> List[Text]:
637 """
638 Retrieves the attributes (columns) of the Slack threads.
640 Returns:
641 List[Text]: The list of columns.
642 """
643 return [
644 "channel_id",
645 "channel_name",
646 "type",
647 "user",
648 "text",
649 "ts",
650 "client_msg_id",
651 "thread_ts",
652 "parent_user_id",
653 "reply_count",
654 "reply_users_count",
655 "latest_reply",
656 "reply_users",
657 ]
660class SlackUsersTable(APIResource):
661 """
662 This is the table abstraction for interacting with users in Slack.
663 """
665 def list(self, conditions: List[FilterCondition] = None, limit: int = None, **kwargs: Any) -> pd.DataFrame:
666 """
667 Retrieves a list of users based on the specified conditions.
668 Users are retrieved as follows:
669 - If the provided limit is greater than 1000, no limit to the API call is provided and the results are paginated until the limit is reached.
670 - Otherwise, the provided limit or a default limit of 1000 is used when making the API call.
672 Therefore, if a user is to retrieve more than 1000 users, the limit should be set to a value greater than 1000.
674 The above is designed to prevent rate limiting by the Slack API.
676 Args:
677 conditions (List[FilterCondition]): The conditions to filter the users.
678 limit (int): The limit of the users to return.
679 kwargs (Any): Arbitrary keyword arguments.
681 Raises:
682 SlackApiError: If an error occurs when getting the users from the Slack API.
683 """
684 client = self.handler.connect()
686 # Retrieve the users from the Slack API.
687 try:
688 # If the limit is greater than 1000, paginate the results until the limit is reached.
689 if limit and limit > 1000:
690 response = client.users_list()
691 users = response["members"]
693 # Paginate the results until the limit is reached.
694 while response["response_metadata"]["next_cursor"]:
695 response = client.users_list(cursor=response["response_metadata"]["next_cursor"])
696 users.extend(response["members"])
697 if len(users) >= limit: 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true
698 break
700 users = users[:limit]
701 # Otherwise, use the provided limit or a default limit of 1000.
702 else:
703 response = client.users_list(limit=limit if limit else 1000)
704 users = response["members"]
705 except SlackApiError as slack_error:
706 logger.error(f"Error getting users: {slack_error.response['error']}")
707 raise
709 return pd.DataFrame(users, columns=self.get_columns())
711 def get_columns(self) -> List[Text]:
712 """
713 Retrieves the attributes (columns) of the Slack users.
715 Returns:
716 List[Text]: The list of columns.
717 """
718 return ["id", "name", "real_name"]