Coverage for mindsdb / integrations / handlers / rocket_chat_handler / rocket_chat_handler.py: 0%
67 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pandas as pd
3from rocketchat_API.rocketchat import RocketChat
5from mindsdb.integrations.handlers.rocket_chat_handler.rocket_chat_tables import (
6 ChannelMessagesTable, ChannelsTable, DirectsTable, DirectMessagesTable, UsersTable)
7from mindsdb.integrations.libs.api_handler import APIChatHandler
8from mindsdb.integrations.libs.response import (
9 HandlerStatusResponse as StatusResponse,
10 HandlerResponse as Response,
11)
12from mindsdb.utilities import log
13from mindsdb_sql_parser import parse_sql
15logger = log.getLogger(__name__)
18class RocketChatHandler(APIChatHandler):
19 """A class for handling connections and interactions with the Rocket Chat API.
21 Attributes:
22 username (str): Rocket Chat username to use for authentication.
23 password (str): Rocket Chat username to use for authentication.
24 auth_token (str): Rocket Chat authorization token to use for all API requests.
25 auth_user_id (str): Rocket Chat user ID to associate with all API requests
26 domain (str): Path to Rocket Chat domain to use (e.g. https://mindsdb.rocket.chat).
27 client (RocketChatClient): The `RocketChatClient` object for interacting with the Rocket Chat API.
28 is_connected (bool): Whether or not the API client is connected to the Rocket Chat API.
29 """
31 def __init__(self, name: str = None, **kwargs):
32 """Registers all API tables and prepares the handler for an API connection.
34 Args:
35 name: (str): The handler name to use
36 """
37 super().__init__(name)
38 self.username = None
39 self.password = None
40 self.auth_token = None
41 self.auth_user_id = None
42 self.domain = None
44 args = kwargs.get('connection_data', {})
45 if 'domain' not in args:
46 raise ValueError('Must include Rocket Chat "domain" to read and write messages')
47 self.domain = args['domain']
49 if 'token' in args and 'user_id' in args:
50 self.auth_token = args['token']
51 self.auth_user_id = args['user_id']
52 elif 'username' in args and 'password' in args:
53 self.username = args['username']
54 self.password = args['password']
55 else:
56 raise ValueError('Need "token" and "user_id", or "username" and "password" to connect to Rocket Chat')
58 self.client = None
59 self.is_connected = False
61 self._register_table('channels', ChannelsTable(self))
63 self._register_table('channel_messages', ChannelMessagesTable(self))
65 self._register_table('directs', DirectsTable(self))
67 self._register_table('direct_messages', DirectMessagesTable(self))
69 self._register_table('users', UsersTable(self))
71 def get_chat_config(self):
72 params = {
73 'polling': {
74 'type': 'message_count',
75 'table': 'directs',
76 'chat_id_col': '_id',
77 'count_col': 'msgs'
78 },
79 'chat_table': {
80 'name': 'direct_messages',
81 'chat_id_col': 'room_id',
82 'username_col': 'username',
83 'text_col': 'text',
84 'time_col': 'sent_at',
85 }
86 }
87 return params
89 def get_my_user_name(self):
90 info = self.call_api('me')
91 return info['username']
93 def connect(self):
94 """Creates a new Rocket Chat API client if needed and sets it as the client to use for requests.
96 Returns newly created Rocket Chat API client, or current client if already set.
97 """
98 if self.is_connected and self.client is not None:
99 return self.client
101 self.client = RocketChat(
102 user=self.username,
103 password=self.password,
104 auth_token=self.auth_token,
105 user_id=self.auth_user_id,
106 server_url=self.domain
107 )
109 self.is_connected = True
110 return self.client
112 def check_connection(self) -> StatusResponse:
113 """Checks connection to Rocket Chat API by sending a ping request.
115 Returns StatusResponse indicating whether or not the handler is connected.
116 """
118 response = StatusResponse(False)
120 try:
121 self.connect()
122 response.success = True
123 except Exception as e:
124 logger.error(f'Error connecting to Rocket Chat API: {e}!')
125 response.error_message = e
127 if response.success is False:
128 self.is_connected = False
129 return response
131 def native_query(self, query: str = None) -> Response:
132 ast = parse_sql(query)
133 return self.query(ast)
135 def call_api(self, method_name: str = None, *args, **kwargs) -> pd.DataFrame:
136 """Calls the Rocket Chat API method with the given params.
138 Returns results as a pandas DataFrame.
140 Args:
141 method_name (str): Method name to call
142 params (Dict): Params to pass to the API call
143 """
144 client = self.connect()
146 method = getattr(client, method_name)
148 messages_response = method(*args, **kwargs)
150 if not messages_response.ok:
151 messages_response.raise_for_status()
152 return messages_response.json()