Coverage for mindsdb / integrations / utilities / handlers / auth_utilities / microsoft / ms_graph_api_auth_utilities.py: 72%
64 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from abc import ABC, abstractmethod
2from typing import Dict, List, Text
4from flask import request
5import msal
7from mindsdb.integrations.utilities.handlers.auth_utilities.exceptions import AuthException
8from mindsdb.utilities import log
11logger = log.getLogger(__name__)
14class MSGraphAPIPermissionsManager(ABC):
15 """
16 The base class for managing the delegated permissions for the Microsoft Graph API.
17 """
18 def __init__(
19 self,
20 client_id: Text,
21 client_secret: Text,
22 tenant_id: Text,
23 cache: msal.SerializableTokenCache,
24 scopes: List = ["https://graph.microsoft.com/.default"],
25 ) -> None:
26 """
27 Initializes the permissions manager.
29 Args:
30 client_id (Text): The client ID of the application registered in Microsoft Entra ID.
31 client_secret (Text): The client secret of the application registered in Microsoft Entra ID.
32 tenant_id (Text): The tenant ID of the application registered in Microsoft Entra ID.
33 cache (msal.SerializableTokenCache): The token cache for storing the access token.
34 scopes (List): The scopes for the Microsoft Graph API.
35 code (Text): The authentication code for acquiring the access token.
36 """
37 self.client_id = client_id
38 self.client_secret = client_secret
39 self.tenant_id = tenant_id
40 self.cache = cache
41 self.scopes = scopes
43 @abstractmethod
44 def get_access_token(self) -> Text:
45 """
46 Retrieves an access token for the Microsoft Graph API.
48 Returns:
49 Text: The access token for the Microsoft Graph API.
50 """
51 pass
53 def _get_msal_app(self) -> msal.ConfidentialClientApplication:
54 """
55 Returns an instance of the MSAL ConfidentialClientApplication.
57 Returns:
58 msal.ConfidentialClientApplication: An instance of the MSAL ConfidentialClientApplication.
59 """
60 return msal.ConfidentialClientApplication(
61 self.client_id,
62 authority=f"https://login.microsoftonline.com/{self.tenant_id}",
63 client_credential=self.client_secret,
64 token_cache=self.cache,
65 )
68class MSGraphAPIDelegatedPermissionsManager(MSGraphAPIPermissionsManager):
69 """
70 The class for managing the delegated permissions for the Microsoft Graph API.
71 """
72 def __init__(
73 self,
74 client_id: Text,
75 client_secret: Text,
76 tenant_id: Text,
77 cache: msal.SerializableTokenCache,
78 scopes: List = ["https://graph.microsoft.com/.default"],
79 code: Text = None,
80 ) -> None:
81 """
82 Initializes the delegated permissions manager.
84 Args:
85 client_id (Text): The client ID of the application registered in Microsoft Entra ID.
86 client_secret (Text): The client secret of the application registered in Microsoft Entra ID.
87 tenant_id (Text): The tenant ID of the application registered in Microsoft Entra ID.
88 cache (msal.SerializableTokenCache): The token cache for storing the access token.
89 scopes (List): The scopes for the Microsoft Graph API.
90 code (Text): The authentication code for acquiring the access token.
91 """
92 super().__init__(client_id, client_secret, tenant_id, cache, scopes)
93 self.code = code
94 self.redirect_uri = None
95 self._set_redirect_uri()
97 def _set_redirect_uri(self) -> None:
98 """
99 Sets the redirect URI based on the request origin.
101 Raises:
102 AuthException: If the request origin could not be determined.
103 """
104 # Set the redirect URI based on the request origin.
105 # If the request origin is 127.0.0.1 (localhost), replace it with localhost.
106 # This is done because the only HTTP origin allowed in Microsoft Entra ID app registration is localhost.
107 try:
108 request_origin = request.headers.get('ORIGIN') or (request.scheme + '://' + request.host)
109 if not request_origin:
110 raise AuthException('Request origin could not be determined!')
111 except RuntimeError:
112 # if it is outside of request context (streaming in agent)
113 request_origin = ''
115 request_origin = request_origin.replace('127.0.0.1', 'localhost') if 'http://127.0.0.1' in request_origin else request_origin
116 self.redirect_uri = request_origin + '/verify-auth'
118 def get_access_token(self) -> Text:
119 """
120 Retrieves an access token for the Microsoft Graph API.
121 If a valid access token is found in the cache, it is returned.
122 Otherwise, the authentication flow is executed.
124 Returns:
125 Text: The access token for the Microsoft Graph API.
126 """
127 # Check if a valid access token is already in the cache for the signed-in user.
128 msal_app = self._get_msal_app()
129 accounts = msal_app.get_accounts()
131 if accounts:
132 response = msal_app.acquire_token_silent(self.scopes, account=accounts[0])
133 if "access_token" in response: 133 ↛ 137line 133 didn't jump to line 137 because the condition on line 133 was always true
134 return response['access_token']
136 # If no valid access token is found in the cache, run the authentication flow.
137 response = self._execute_ms_graph_api_auth_flow()
139 if "access_token" in response:
140 return response['access_token']
141 # If no access token is returned, raise an exception.
142 # This is the expected behaviour when the user attempts to authenticate for the first time.
143 else:
144 raise AuthException(
145 f'Error getting access token: {response.get("error_description")}',
146 auth_url=response.get('auth_url')
147 )
149 def _execute_ms_graph_api_auth_flow(self) -> Dict:
150 """
151 Executes the authentication flow for the Microsoft Graph API.
152 If the authentication code is provided, the token is acquired by authorization code.
153 Otherwise, the authorization request URL is returned.
155 Raises:
156 AuthException: If the authentication code is not provided
158 Returns:
159 Dict: The response from the Microsoft Graph API authentication flow.
160 """
161 msal_app = self._get_msal_app()
163 # If the authentication code is provided, acquire the token by authorization code.
164 if self.code:
165 response = msal_app.acquire_token_by_authorization_code(
166 code=self.code,
167 scopes=self.scopes,
168 redirect_uri=self.redirect_uri
169 )
171 return response
173 # If the authentication code is not provided, get the authorization request URL.
174 else:
175 auth_url = msal_app.get_authorization_request_url(
176 scopes=self.scopes,
177 redirect_uri=self.redirect_uri
178 )
180 raise AuthException(f'Authorisation required. Please follow the url: {auth_url}', auth_url=auth_url)
183class MSGraphAPIApplicationPermissionsManager(MSGraphAPIPermissionsManager):
184 """
185 The class for managing application permissions for the Microsoft Graph API.
186 """
188 def get_access_token(self) -> Text:
189 """
190 Retrieves an access token for the Microsoft Graph API using the client credentials flow.
192 Returns:
193 Text: The access token for the Microsoft Graph API.
194 """
195 msal_app = self._get_msal_app()
197 # Check if a valid access token is already in the cache.
198 accounts = msal_app.get_accounts()
199 if accounts:
200 response = msal_app.acquire_token_silent(self.scopes, account=accounts[0])
201 if "access_token" in response:
202 return response["access_token"]
204 # If no valid access token is found in the cache, acquire a new token using client credentials.
205 response = msal_app.acquire_token_for_client(scopes=self.scopes)
207 if "access_token" in response:
208 return response["access_token"]
209 else:
210 raise AuthException(
211 f"Error getting access token: {response.get('error_description')}"
212 )