Coverage for mindsdb / integrations / handlers / rocket_chat_handler / rocket_chat_handler.py: 0%

67 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2 

3from rocketchat_API.rocketchat import RocketChat 

4 

5from mindsdb.integrations.handlers.rocket_chat_handler.rocket_chat_tables import ( 

6 ChannelMessagesTable, ChannelsTable, DirectsTable, DirectMessagesTable, UsersTable) 

7from mindsdb.integrations.libs.api_handler import APIChatHandler 

8from mindsdb.integrations.libs.response import ( 

9 HandlerStatusResponse as StatusResponse, 

10 HandlerResponse as Response, 

11) 

12from mindsdb.utilities import log 

13from mindsdb_sql_parser import parse_sql 

14 

15logger = log.getLogger(__name__) 

16 

17 

18class RocketChatHandler(APIChatHandler): 

19 """A class for handling connections and interactions with the Rocket Chat API. 

20 

21 Attributes: 

22 username (str): Rocket Chat username to use for authentication. 

23 password (str): Rocket Chat username to use for authentication. 

24 auth_token (str): Rocket Chat authorization token to use for all API requests. 

25 auth_user_id (str): Rocket Chat user ID to associate with all API requests 

26 domain (str): Path to Rocket Chat domain to use (e.g. https://mindsdb.rocket.chat). 

27 client (RocketChatClient): The `RocketChatClient` object for interacting with the Rocket Chat API. 

28 is_connected (bool): Whether or not the API client is connected to the Rocket Chat API. 

29 """ 

30 

31 def __init__(self, name: str = None, **kwargs): 

32 """Registers all API tables and prepares the handler for an API connection. 

33 

34 Args: 

35 name: (str): The handler name to use 

36 """ 

37 super().__init__(name) 

38 self.username = None 

39 self.password = None 

40 self.auth_token = None 

41 self.auth_user_id = None 

42 self.domain = None 

43 

44 args = kwargs.get('connection_data', {}) 

45 if 'domain' not in args: 

46 raise ValueError('Must include Rocket Chat "domain" to read and write messages') 

47 self.domain = args['domain'] 

48 

49 if 'token' in args and 'user_id' in args: 

50 self.auth_token = args['token'] 

51 self.auth_user_id = args['user_id'] 

52 elif 'username' in args and 'password' in args: 

53 self.username = args['username'] 

54 self.password = args['password'] 

55 else: 

56 raise ValueError('Need "token" and "user_id", or "username" and "password" to connect to Rocket Chat') 

57 

58 self.client = None 

59 self.is_connected = False 

60 

61 self._register_table('channels', ChannelsTable(self)) 

62 

63 self._register_table('channel_messages', ChannelMessagesTable(self)) 

64 

65 self._register_table('directs', DirectsTable(self)) 

66 

67 self._register_table('direct_messages', DirectMessagesTable(self)) 

68 

69 self._register_table('users', UsersTable(self)) 

70 

71 def get_chat_config(self): 

72 params = { 

73 'polling': { 

74 'type': 'message_count', 

75 'table': 'directs', 

76 'chat_id_col': '_id', 

77 'count_col': 'msgs' 

78 }, 

79 'chat_table': { 

80 'name': 'direct_messages', 

81 'chat_id_col': 'room_id', 

82 'username_col': 'username', 

83 'text_col': 'text', 

84 'time_col': 'sent_at', 

85 } 

86 } 

87 return params 

88 

89 def get_my_user_name(self): 

90 info = self.call_api('me') 

91 return info['username'] 

92 

93 def connect(self): 

94 """Creates a new Rocket Chat API client if needed and sets it as the client to use for requests. 

95 

96 Returns newly created Rocket Chat API client, or current client if already set. 

97 """ 

98 if self.is_connected and self.client is not None: 

99 return self.client 

100 

101 self.client = RocketChat( 

102 user=self.username, 

103 password=self.password, 

104 auth_token=self.auth_token, 

105 user_id=self.auth_user_id, 

106 server_url=self.domain 

107 ) 

108 

109 self.is_connected = True 

110 return self.client 

111 

112 def check_connection(self) -> StatusResponse: 

113 """Checks connection to Rocket Chat API by sending a ping request. 

114 

115 Returns StatusResponse indicating whether or not the handler is connected. 

116 """ 

117 

118 response = StatusResponse(False) 

119 

120 try: 

121 self.connect() 

122 response.success = True 

123 except Exception as e: 

124 logger.error(f'Error connecting to Rocket Chat API: {e}!') 

125 response.error_message = e 

126 

127 if response.success is False: 

128 self.is_connected = False 

129 return response 

130 

131 def native_query(self, query: str = None) -> Response: 

132 ast = parse_sql(query) 

133 return self.query(ast) 

134 

135 def call_api(self, method_name: str = None, *args, **kwargs) -> pd.DataFrame: 

136 """Calls the Rocket Chat API method with the given params. 

137 

138 Returns results as a pandas DataFrame. 

139 

140 Args: 

141 method_name (str): Method name to call 

142 params (Dict): Params to pass to the API call 

143 """ 

144 client = self.connect() 

145 

146 method = getattr(client, method_name) 

147 

148 messages_response = method(*args, **kwargs) 

149 

150 if not messages_response.ok: 

151 messages_response.raise_for_status() 

152 return messages_response.json()