Coverage for mindsdb / integrations / handlers / ms_teams_handler / ms_teams_handler.py: 69%
111 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Dict, Text, Callable, Union
3from botbuilder.schema import Activity, ActivityTypes
4from botbuilder.schema import ChannelAccount
5from botframework.connector import ConnectorClient
6from botframework.connector.auth import MicrosoftAppCredentials
7import msal
8from requests.exceptions import RequestException
10from mindsdb.integrations.handlers.ms_teams_handler.ms_graph_api_teams_client import (
11 MSGraphAPIBaseClient,
12 MSGraphAPITeamsApplicationPermissionsClient,
13 MSGraphAPITeamsDelegatedPermissionsClient
14)
15from mindsdb.integrations.handlers.ms_teams_handler.ms_teams_tables import (
16 ChannelsTable, ChannelMessagesTable, ChatsTable, ChatMessagesTable, TeamsTable
17)
18from mindsdb.integrations.libs.response import (
19 HandlerStatusResponse as StatusResponse,
20)
21from mindsdb.integrations.libs.api_handler import APIChatHandler
22from mindsdb.integrations.utilities.handlers.auth_utilities.microsoft import (
23 MSGraphAPIApplicationPermissionsManager,
24 MSGraphAPIDelegatedPermissionsManager
25)
26from mindsdb.integrations.utilities.handlers.auth_utilities.exceptions import AuthException
27from mindsdb.interfaces.chatbot.types import ChatBotMessage
28from mindsdb.utilities import log
31logger = log.getLogger(__name__)
34def chatbot_only(func):
35 def wrapper(self, *args, **kwargs):
36 if self.connection_data.get('opertion_mode', 'datasource') != 'chatbot':
37 raise ValueError("This connection can only be used as a data source. Please use a chatbot connection by setting the 'mode' parameter to 'chat'.")
38 return func(self, *args, **kwargs)
39 return wrapper
42class MSTeamsHandler(APIChatHandler):
43 """
44 This handler handles the connection and execution of SQL statements on Microsoft Teams via the Microsoft Graph API.
45 It is also responsible for handling the chatbot functionality.
46 """
48 name = 'teams'
50 def __init__(self, name: str, **kwargs):
51 """
52 Initializes the handler.
54 Args:
55 name (str): name of particular handler instance
56 **kwargs: arbitrary keyword arguments.
57 """
58 super().__init__(name)
60 connection_data = kwargs.get("connection_data", {})
61 self.connection_data = connection_data
62 self.handler_storage = kwargs['handler_storage']
63 self.kwargs = kwargs
65 self.connection = None
66 self.is_connected = False
68 self.service_url = None
69 self.channel_id = None
70 self.bot_id = None
71 self.conversation_id = None
73 def connect(self) -> Union[MicrosoftAppCredentials, MSGraphAPIBaseClient]:
74 """
75 Establishes a connection to the Microsoft Teams registered app or the Microsoft Graph API.
77 Returns:
78 Union[MicrosoftAppCredentials, MSGraphAPITeamsDelegatedPermissionsClient]: A connection object to the Microsoft Teams registered app or the Microsoft Graph API.
79 """
80 if self.is_connected: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true
81 return self.connection
83 # The default operation mode is 'datasource'. This is used for data source connections.
84 operation_mode = self.connection_data.get('operation_mode', 'datasource')
85 if operation_mode == 'datasource': 85 ↛ 139line 85 didn't jump to line 139 because the condition on line 85 was always true
86 # Initialize the token cache.
87 cache = msal.SerializableTokenCache()
89 # Load the cache from file if it exists.
90 cache_file = 'cache.bin'
91 try:
92 cache_content = self.handler_storage.file_get(cache_file)
93 except FileNotFoundError:
94 cache_content = None
96 if cache_content:
97 cache.deserialize(cache_content)
99 # The default permissions mode is 'delegated'. This requires the user to sign in.
100 permission_mode = self.connection_data.get('permission_mode', 'delegated')
101 if permission_mode == 'delegated': 101 ↛ 110line 101 didn't jump to line 110 because the condition on line 101 was always true
102 permissions_manager = MSGraphAPIDelegatedPermissionsManager(
103 client_id=self.connection_data['client_id'],
104 client_secret=self.connection_data['client_secret'],
105 tenant_id=self.connection_data['tenant_id'],
106 cache=cache,
107 code=self.connection_data.get('code')
108 )
110 elif permission_mode == 'application':
111 permissions_manager = MSGraphAPIApplicationPermissionsManager(
112 client_id=self.connection_data['client_id'],
113 client_secret=self.connection_data['client_secret'],
114 tenant_id=self.connection_data['tenant_id'],
115 cache=cache
116 )
118 else:
119 raise ValueError("The supported permission modes are 'delegated' and 'application'.")
121 access_token = permissions_manager.get_access_token()
123 # Save the cache back to file if it has changed.
124 if cache.has_state_changed:
125 self.handler_storage.file_set(cache_file, cache.serialize().encode('utf-8'))
127 if permission_mode == 'delegated': 127 ↛ 131line 127 didn't jump to line 131 because the condition on line 127 was always true
128 self.connection = MSGraphAPITeamsDelegatedPermissionsClient(access_token)
130 else:
131 self.connection = MSGraphAPITeamsApplicationPermissionsClient(access_token)
133 self._register_table('channels', ChannelsTable(self))
134 self._register_table('channel_messages', ChannelMessagesTable(self))
135 self._register_table('chats', ChatsTable(self))
136 self._register_table('chat_messages', ChatMessagesTable(self))
137 self._register_table('teams', TeamsTable(self))
139 elif operation_mode == 'chatbot':
140 self.connection = MicrosoftAppCredentials(
141 self.connection_data['app_id'],
142 self.connection_data['app_password']
143 )
145 else:
146 raise ValueError("The supported operation modes are 'datasource' and 'chatbot'.")
148 self.is_connected = True
150 return self.connection
152 def check_connection(self) -> StatusResponse:
153 """
154 Checks the status of the connection to Microsoft Teams.
156 Returns:
157 StatusResponse: An object containing the success status and an error message if an error occurs.
158 """
159 response = StatusResponse(False)
161 try:
162 connection = self.connect()
163 # A connection check against the Microsoft Graph API is run if the connection is in 'datasource' mode.
164 if self.connection_data.get('operation_mode', 'datasource') == 'datasource' and connection.check_connection():
165 response.success = True
166 response.copy_storage = True
167 else:
168 raise RequestException("Connection check failed!")
169 except (ValueError, RequestException) as known_error:
170 logger.error(f'Connection check to Microsoft Teams failed, {known_error}!')
171 response.error_message = str(known_error)
172 except AuthException as error:
173 response.error_message = str(error)
174 response.redirect_url = error.auth_url
175 return response
176 except Exception as unknown_error:
177 logger.error(f'Connection check to Microsoft Teams failed due to an unknown error, {unknown_error}!')
178 response.error_message = str(unknown_error)
180 self.is_connected = response.success
182 return response
184 @chatbot_only
185 def get_chat_config(self) -> Dict:
186 """
187 Gets the configuration for the chatbot.
188 This method is required for the implementation of the chatbot.
190 Returns:
191 Dict: The configuration for the chatbot.
192 """
193 params = {
194 'polling': {
195 'type': 'webhook'
196 }
197 }
199 return params
201 @chatbot_only
202 def get_my_user_name(self) -> Text:
203 """
204 Gets the name of the signed in user.
205 This method is required for the implementation of the chatbot.
207 Returns:
208 Text: The name of the signed in user.
209 """
210 return None
212 @chatbot_only
213 def on_webhook(self, request: Dict, callback: Callable) -> None:
214 """
215 Handles a webhook request.
217 Args:
218 request (Dict): The request data.
219 callback (Callable): The callback function to call.
220 """
221 self.service_url = request["serviceUrl"]
222 self.channel_id = request["channelId"]
223 self.bot_id = request["from"]["id"]
224 self.conversation_id = request["conversation"]["id"]
226 chat_bot_message = ChatBotMessage(
227 ChatBotMessage.Type.DIRECT,
228 text=request["text"],
229 user=request["from"]["id"],
230 destination=request["recipient"]["id"]
231 )
233 callback(
234 chat_id=request['conversation']['id'],
235 message=chat_bot_message
236 )
238 @chatbot_only
239 def respond(self, message: ChatBotMessage) -> None:
240 """
241 Sends a response to the chatbot.
243 Args:
244 message (ChatBotMessage): The message to send
246 Raises:
247 ValueError: If the chatbot message is not of type DIRECT.
249 Returns:
250 None
251 """
252 credentials = self.connect()
254 connector = ConnectorClient(credentials, base_url=self.service_url)
255 connector.conversations.send_to_conversation(
256 self.conversation_id,
257 Activity(
258 type=ActivityTypes.message,
259 channel_id=self.channel_id,
260 recipient=ChannelAccount(
261 id=message.destination
262 ),
263 from_property=ChannelAccount(
264 id=self.bot_id
265 ),
266 text=message.text
267 )
268 )