Coverage for mindsdb / integrations / handlers / rocket_chat_handler / rocket_chat_tables.py: 0%
94 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pandas as pd
3from mindsdb.integrations.libs.api_handler import APITable
4from mindsdb.integrations.utilities.sql_utils import conditions_to_filter, project_dataframe, sort_dataframe
5from mindsdb_sql_parser import ast
8def message_to_dataframe_row(message: dict):
9 message['id'] = message['_id']
10 message['room_id'] = message['rid']
11 message['text'] = message['msg']
12 message['sent_at'] = message['ts']
14 if 'u' in message:
15 if 'username' in message['u']:
16 message['username'] = message['u']['username']
17 if 'name' in message['u']:
18 message['name'] = message['u']['name']
19 if 'bot' in message and 'i' in message['bot']:
20 message['bot_id'] = message['bot']['i']
21 return message
24class ChannelsTable(APITable):
25 def select(self, query: ast.Select) -> pd.DataFrame:
26 message_data = self.handler.call_api('channels_list')
27 df = pd.DataFrame(message_data['channels'])
28 df = project_dataframe(df, query.targets, self.get_columns())
30 return df
32 def get_columns(self):
33 """Gets all columns to be returned in pandas DataFrame responses"""
34 return [
35 '_id',
36 'name',
37 'usersCount',
38 'msgs',
39 ]
42class ChannelMessagesTable(APITable):
43 """Manages SELECT and INSERT operations for Rocket Chat messages."""
45 def select(self, query: ast.Select) -> pd.DataFrame:
46 """Selects message data from the Rocket Chat API and returns it as a pandas DataFrame.
48 Returns dataframe representing the Rocket Chat API results.
50 Args:
51 query (ast.Select): Given SQL SELECT query
52 """
53 filters = conditions_to_filter(query.where)
55 if 'room_id' not in filters:
56 raise NotImplementedError()
58 params = {}
60 if query.limit:
61 params['count'] = query.limit
63 # See Channel Messages endpoint:
64 # https://developer.rocket.chat/reference/api/rest-api/endpoints/core-endpoints/channels-endpoints/messages
65 message_data = self.handler.call_api('channels_history', filters['room_id'], **params)
67 # Only return the columns we need to.
68 message_rows = [message_to_dataframe_row(m) for m in message_data['messages']]
69 df = pd.DataFrame(message_rows)
71 df = sort_dataframe(df, query.order_by)
72 df = project_dataframe(df, query.targets, self.get_columns())
74 return df
76 def insert(self, query: ast.Insert):
77 """Posts a message using the Rocket Chat API.
79 Args:
80 query (ast.Insert): Given SQL INSERT query
81 """
82 # See Post Message endpoint:
83 # https://developer.rocket.chat/reference/api/rest-api/endpoints/core-endpoints/chat-endpoints/postmessage
84 column_names = [col.name for col in query.columns]
85 for insert_row in query.values:
86 insert_params = dict(zip(column_names, insert_row))
88 self.handler.call_api('chat_post_message', **insert_params)
90 def get_columns(self):
91 """Gets all columns to be returned in pandas DataFrame responses"""
92 return [
93 'id',
94 'room_id',
95 'bot_id',
96 'text',
97 'username',
98 'name',
99 'sent_at'
100 ]
103class DirectsTable(APITable):
104 def select(self, query: ast.Select) -> pd.DataFrame:
105 message_data = self.handler.call_api('im_list')
106 df = pd.DataFrame(message_data['ims'])
107 df = project_dataframe(df, query.targets, self.get_columns())
109 return df
111 def insert(self, query: ast.Insert):
112 column_names = [col.name for col in query.columns]
113 for insert_row in query.values:
114 insert_params = dict(zip(column_names, insert_row))
116 self.handler.call_api('im_create', **insert_params)
118 def get_columns(self):
119 """Gets all columns to be returned in pandas DataFrame responses"""
120 return [
121 '_id',
122 'usernames',
123 'usersCount',
124 'msgs',
125 ]
128class DirectMessagesTable(APITable):
130 def select(self, query: ast.Select) -> pd.DataFrame:
132 filters = conditions_to_filter(query.where)
134 if 'room_id' not in filters:
135 raise NotImplementedError()
137 params = {}
139 if query.limit:
140 params['count'] = query.limit
142 message_data = self.handler.call_api('im_history', filters['room_id'], **params)
144 message_rows = [message_to_dataframe_row(m) for m in message_data['messages']]
145 df = pd.DataFrame(message_rows)
146 df = sort_dataframe(df, query.order_by)
147 df = project_dataframe(df, query.targets, self.get_columns())
149 return df
151 def insert(self, query: ast.Insert):
153 column_names = [col.name for col in query.columns]
154 for insert_row in query.values:
155 insert_params = dict(zip(column_names, insert_row))
157 if 'username' in insert_params:
158 # resolve username
159 resp = self.handler.call_api('users_info', insert_params['username'])
160 if 'user' in resp:
161 insert_params['room_id'] = resp['user']['_id']
162 del insert_params['username']
163 else:
164 raise ValueError(f'User not found: {insert_params["username"]}')
166 self.handler.call_api('chat_post_message', **insert_params)
168 def get_columns(self):
169 """Gets all columns to be returned in pandas DataFrame responses"""
170 return [
171 'id',
172 'room_id',
173 'bot_id',
174 'text',
175 'username',
176 'name',
177 'sent_at'
178 ]
181class UsersTable(APITable):
182 def select(self, query: ast.Select) -> pd.DataFrame:
183 message_data = self.handler.call_api('users_list')
184 df = pd.DataFrame(message_data['users'])
185 df = project_dataframe(df, query.targets, self.get_columns())
187 return df
189 def get_columns(self):
190 """Gets all columns to be returned in pandas DataFrame responses"""
191 return [
192 '_id',
193 'username',
194 'name',
195 'status',
196 'active',
197 'type',
198 ]