Coverage for mindsdb / integrations / handlers / ms_teams_handler / ms_teams_handler.py: 69%

111 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Dict, Text, Callable, Union 

2 

3from botbuilder.schema import Activity, ActivityTypes 

4from botbuilder.schema import ChannelAccount 

5from botframework.connector import ConnectorClient 

6from botframework.connector.auth import MicrosoftAppCredentials 

7import msal 

8from requests.exceptions import RequestException 

9 

10from mindsdb.integrations.handlers.ms_teams_handler.ms_graph_api_teams_client import ( 

11 MSGraphAPIBaseClient, 

12 MSGraphAPITeamsApplicationPermissionsClient, 

13 MSGraphAPITeamsDelegatedPermissionsClient 

14) 

15from mindsdb.integrations.handlers.ms_teams_handler.ms_teams_tables import ( 

16 ChannelsTable, ChannelMessagesTable, ChatsTable, ChatMessagesTable, TeamsTable 

17) 

18from mindsdb.integrations.libs.response import ( 

19 HandlerStatusResponse as StatusResponse, 

20) 

21from mindsdb.integrations.libs.api_handler import APIChatHandler 

22from mindsdb.integrations.utilities.handlers.auth_utilities.microsoft import ( 

23 MSGraphAPIApplicationPermissionsManager, 

24 MSGraphAPIDelegatedPermissionsManager 

25) 

26from mindsdb.integrations.utilities.handlers.auth_utilities.exceptions import AuthException 

27from mindsdb.interfaces.chatbot.types import ChatBotMessage 

28from mindsdb.utilities import log 

29 

30 

31logger = log.getLogger(__name__) 

32 

33 

34def chatbot_only(func): 

35 def wrapper(self, *args, **kwargs): 

36 if self.connection_data.get('opertion_mode', 'datasource') != 'chatbot': 

37 raise ValueError("This connection can only be used as a data source. Please use a chatbot connection by setting the 'mode' parameter to 'chat'.") 

38 return func(self, *args, **kwargs) 

39 return wrapper 

40 

41 

42class MSTeamsHandler(APIChatHandler): 

43 """ 

44 This handler handles the connection and execution of SQL statements on Microsoft Teams via the Microsoft Graph API. 

45 It is also responsible for handling the chatbot functionality. 

46 """ 

47 

48 name = 'teams' 

49 

50 def __init__(self, name: str, **kwargs): 

51 """ 

52 Initializes the handler. 

53 

54 Args: 

55 name (str): name of particular handler instance 

56 **kwargs: arbitrary keyword arguments. 

57 """ 

58 super().__init__(name) 

59 

60 connection_data = kwargs.get("connection_data", {}) 

61 self.connection_data = connection_data 

62 self.handler_storage = kwargs['handler_storage'] 

63 self.kwargs = kwargs 

64 

65 self.connection = None 

66 self.is_connected = False 

67 

68 self.service_url = None 

69 self.channel_id = None 

70 self.bot_id = None 

71 self.conversation_id = None 

72 

73 def connect(self) -> Union[MicrosoftAppCredentials, MSGraphAPIBaseClient]: 

74 """ 

75 Establishes a connection to the Microsoft Teams registered app or the Microsoft Graph API. 

76 

77 Returns: 

78 Union[MicrosoftAppCredentials, MSGraphAPITeamsDelegatedPermissionsClient]: A connection object to the Microsoft Teams registered app or the Microsoft Graph API. 

79 """ 

80 if self.is_connected: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true

81 return self.connection 

82 

83 # The default operation mode is 'datasource'. This is used for data source connections. 

84 operation_mode = self.connection_data.get('operation_mode', 'datasource') 

85 if operation_mode == 'datasource': 85 ↛ 139line 85 didn't jump to line 139 because the condition on line 85 was always true

86 # Initialize the token cache. 

87 cache = msal.SerializableTokenCache() 

88 

89 # Load the cache from file if it exists. 

90 cache_file = 'cache.bin' 

91 try: 

92 cache_content = self.handler_storage.file_get(cache_file) 

93 except FileNotFoundError: 

94 cache_content = None 

95 

96 if cache_content: 

97 cache.deserialize(cache_content) 

98 

99 # The default permissions mode is 'delegated'. This requires the user to sign in. 

100 permission_mode = self.connection_data.get('permission_mode', 'delegated') 

101 if permission_mode == 'delegated': 101 ↛ 110line 101 didn't jump to line 110 because the condition on line 101 was always true

102 permissions_manager = MSGraphAPIDelegatedPermissionsManager( 

103 client_id=self.connection_data['client_id'], 

104 client_secret=self.connection_data['client_secret'], 

105 tenant_id=self.connection_data['tenant_id'], 

106 cache=cache, 

107 code=self.connection_data.get('code') 

108 ) 

109 

110 elif permission_mode == 'application': 

111 permissions_manager = MSGraphAPIApplicationPermissionsManager( 

112 client_id=self.connection_data['client_id'], 

113 client_secret=self.connection_data['client_secret'], 

114 tenant_id=self.connection_data['tenant_id'], 

115 cache=cache 

116 ) 

117 

118 else: 

119 raise ValueError("The supported permission modes are 'delegated' and 'application'.") 

120 

121 access_token = permissions_manager.get_access_token() 

122 

123 # Save the cache back to file if it has changed. 

124 if cache.has_state_changed: 

125 self.handler_storage.file_set(cache_file, cache.serialize().encode('utf-8')) 

126 

127 if permission_mode == 'delegated': 127 ↛ 131line 127 didn't jump to line 131 because the condition on line 127 was always true

128 self.connection = MSGraphAPITeamsDelegatedPermissionsClient(access_token) 

129 

130 else: 

131 self.connection = MSGraphAPITeamsApplicationPermissionsClient(access_token) 

132 

133 self._register_table('channels', ChannelsTable(self)) 

134 self._register_table('channel_messages', ChannelMessagesTable(self)) 

135 self._register_table('chats', ChatsTable(self)) 

136 self._register_table('chat_messages', ChatMessagesTable(self)) 

137 self._register_table('teams', TeamsTable(self)) 

138 

139 elif operation_mode == 'chatbot': 

140 self.connection = MicrosoftAppCredentials( 

141 self.connection_data['app_id'], 

142 self.connection_data['app_password'] 

143 ) 

144 

145 else: 

146 raise ValueError("The supported operation modes are 'datasource' and 'chatbot'.") 

147 

148 self.is_connected = True 

149 

150 return self.connection 

151 

152 def check_connection(self) -> StatusResponse: 

153 """ 

154 Checks the status of the connection to Microsoft Teams. 

155 

156 Returns: 

157 StatusResponse: An object containing the success status and an error message if an error occurs. 

158 """ 

159 response = StatusResponse(False) 

160 

161 try: 

162 connection = self.connect() 

163 # A connection check against the Microsoft Graph API is run if the connection is in 'datasource' mode. 

164 if self.connection_data.get('operation_mode', 'datasource') == 'datasource' and connection.check_connection(): 

165 response.success = True 

166 response.copy_storage = True 

167 else: 

168 raise RequestException("Connection check failed!") 

169 except (ValueError, RequestException) as known_error: 

170 logger.error(f'Connection check to Microsoft Teams failed, {known_error}!') 

171 response.error_message = str(known_error) 

172 except AuthException as error: 

173 response.error_message = str(error) 

174 response.redirect_url = error.auth_url 

175 return response 

176 except Exception as unknown_error: 

177 logger.error(f'Connection check to Microsoft Teams failed due to an unknown error, {unknown_error}!') 

178 response.error_message = str(unknown_error) 

179 

180 self.is_connected = response.success 

181 

182 return response 

183 

184 @chatbot_only 

185 def get_chat_config(self) -> Dict: 

186 """ 

187 Gets the configuration for the chatbot. 

188 This method is required for the implementation of the chatbot. 

189 

190 Returns: 

191 Dict: The configuration for the chatbot. 

192 """ 

193 params = { 

194 'polling': { 

195 'type': 'webhook' 

196 } 

197 } 

198 

199 return params 

200 

201 @chatbot_only 

202 def get_my_user_name(self) -> Text: 

203 """ 

204 Gets the name of the signed in user. 

205 This method is required for the implementation of the chatbot. 

206 

207 Returns: 

208 Text: The name of the signed in user. 

209 """ 

210 return None 

211 

212 @chatbot_only 

213 def on_webhook(self, request: Dict, callback: Callable) -> None: 

214 """ 

215 Handles a webhook request. 

216 

217 Args: 

218 request (Dict): The request data. 

219 callback (Callable): The callback function to call. 

220 """ 

221 self.service_url = request["serviceUrl"] 

222 self.channel_id = request["channelId"] 

223 self.bot_id = request["from"]["id"] 

224 self.conversation_id = request["conversation"]["id"] 

225 

226 chat_bot_message = ChatBotMessage( 

227 ChatBotMessage.Type.DIRECT, 

228 text=request["text"], 

229 user=request["from"]["id"], 

230 destination=request["recipient"]["id"] 

231 ) 

232 

233 callback( 

234 chat_id=request['conversation']['id'], 

235 message=chat_bot_message 

236 ) 

237 

238 @chatbot_only 

239 def respond(self, message: ChatBotMessage) -> None: 

240 """ 

241 Sends a response to the chatbot. 

242 

243 Args: 

244 message (ChatBotMessage): The message to send 

245 

246 Raises: 

247 ValueError: If the chatbot message is not of type DIRECT. 

248 

249 Returns: 

250 None 

251 """ 

252 credentials = self.connect() 

253 

254 connector = ConnectorClient(credentials, base_url=self.service_url) 

255 connector.conversations.send_to_conversation( 

256 self.conversation_id, 

257 Activity( 

258 type=ActivityTypes.message, 

259 channel_id=self.channel_id, 

260 recipient=ChannelAccount( 

261 id=message.destination 

262 ), 

263 from_property=ChannelAccount( 

264 id=self.bot_id 

265 ), 

266 text=message.text 

267 ) 

268 )