Coverage for mindsdb / integrations / utilities / handlers / auth_utilities / microsoft / ms_graph_api_auth_utilities.py: 72%

64 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from abc import ABC, abstractmethod 

2from typing import Dict, List, Text 

3 

4from flask import request 

5import msal 

6 

7from mindsdb.integrations.utilities.handlers.auth_utilities.exceptions import AuthException 

8from mindsdb.utilities import log 

9 

10 

11logger = log.getLogger(__name__) 

12 

13 

14class MSGraphAPIPermissionsManager(ABC): 

15 """ 

16 The base class for managing the delegated permissions for the Microsoft Graph API. 

17 """ 

18 def __init__( 

19 self, 

20 client_id: Text, 

21 client_secret: Text, 

22 tenant_id: Text, 

23 cache: msal.SerializableTokenCache, 

24 scopes: List = ["https://graph.microsoft.com/.default"], 

25 ) -> None: 

26 """ 

27 Initializes the permissions manager. 

28 

29 Args: 

30 client_id (Text): The client ID of the application registered in Microsoft Entra ID. 

31 client_secret (Text): The client secret of the application registered in Microsoft Entra ID. 

32 tenant_id (Text): The tenant ID of the application registered in Microsoft Entra ID. 

33 cache (msal.SerializableTokenCache): The token cache for storing the access token. 

34 scopes (List): The scopes for the Microsoft Graph API. 

35 code (Text): The authentication code for acquiring the access token. 

36 """ 

37 self.client_id = client_id 

38 self.client_secret = client_secret 

39 self.tenant_id = tenant_id 

40 self.cache = cache 

41 self.scopes = scopes 

42 

43 @abstractmethod 

44 def get_access_token(self) -> Text: 

45 """ 

46 Retrieves an access token for the Microsoft Graph API. 

47 

48 Returns: 

49 Text: The access token for the Microsoft Graph API. 

50 """ 

51 pass 

52 

53 def _get_msal_app(self) -> msal.ConfidentialClientApplication: 

54 """ 

55 Returns an instance of the MSAL ConfidentialClientApplication. 

56 

57 Returns: 

58 msal.ConfidentialClientApplication: An instance of the MSAL ConfidentialClientApplication. 

59 """ 

60 return msal.ConfidentialClientApplication( 

61 self.client_id, 

62 authority=f"https://login.microsoftonline.com/{self.tenant_id}", 

63 client_credential=self.client_secret, 

64 token_cache=self.cache, 

65 ) 

66 

67 

68class MSGraphAPIDelegatedPermissionsManager(MSGraphAPIPermissionsManager): 

69 """ 

70 The class for managing the delegated permissions for the Microsoft Graph API. 

71 """ 

72 def __init__( 

73 self, 

74 client_id: Text, 

75 client_secret: Text, 

76 tenant_id: Text, 

77 cache: msal.SerializableTokenCache, 

78 scopes: List = ["https://graph.microsoft.com/.default"], 

79 code: Text = None, 

80 ) -> None: 

81 """ 

82 Initializes the delegated permissions manager. 

83 

84 Args: 

85 client_id (Text): The client ID of the application registered in Microsoft Entra ID. 

86 client_secret (Text): The client secret of the application registered in Microsoft Entra ID. 

87 tenant_id (Text): The tenant ID of the application registered in Microsoft Entra ID. 

88 cache (msal.SerializableTokenCache): The token cache for storing the access token. 

89 scopes (List): The scopes for the Microsoft Graph API. 

90 code (Text): The authentication code for acquiring the access token. 

91 """ 

92 super().__init__(client_id, client_secret, tenant_id, cache, scopes) 

93 self.code = code 

94 self.redirect_uri = None 

95 self._set_redirect_uri() 

96 

97 def _set_redirect_uri(self) -> None: 

98 """ 

99 Sets the redirect URI based on the request origin. 

100 

101 Raises: 

102 AuthException: If the request origin could not be determined. 

103 """ 

104 # Set the redirect URI based on the request origin. 

105 # If the request origin is 127.0.0.1 (localhost), replace it with localhost. 

106 # This is done because the only HTTP origin allowed in Microsoft Entra ID app registration is localhost. 

107 try: 

108 request_origin = request.headers.get('ORIGIN') or (request.scheme + '://' + request.host) 

109 if not request_origin: 

110 raise AuthException('Request origin could not be determined!') 

111 except RuntimeError: 

112 # if it is outside of request context (streaming in agent) 

113 request_origin = '' 

114 

115 request_origin = request_origin.replace('127.0.0.1', 'localhost') if 'http://127.0.0.1' in request_origin else request_origin 

116 self.redirect_uri = request_origin + '/verify-auth' 

117 

118 def get_access_token(self) -> Text: 

119 """ 

120 Retrieves an access token for the Microsoft Graph API. 

121 If a valid access token is found in the cache, it is returned. 

122 Otherwise, the authentication flow is executed. 

123 

124 Returns: 

125 Text: The access token for the Microsoft Graph API. 

126 """ 

127 # Check if a valid access token is already in the cache for the signed-in user. 

128 msal_app = self._get_msal_app() 

129 accounts = msal_app.get_accounts() 

130 

131 if accounts: 

132 response = msal_app.acquire_token_silent(self.scopes, account=accounts[0]) 

133 if "access_token" in response: 133 ↛ 137line 133 didn't jump to line 137 because the condition on line 133 was always true

134 return response['access_token'] 

135 

136 # If no valid access token is found in the cache, run the authentication flow. 

137 response = self._execute_ms_graph_api_auth_flow() 

138 

139 if "access_token" in response: 

140 return response['access_token'] 

141 # If no access token is returned, raise an exception. 

142 # This is the expected behaviour when the user attempts to authenticate for the first time. 

143 else: 

144 raise AuthException( 

145 f'Error getting access token: {response.get("error_description")}', 

146 auth_url=response.get('auth_url') 

147 ) 

148 

149 def _execute_ms_graph_api_auth_flow(self) -> Dict: 

150 """ 

151 Executes the authentication flow for the Microsoft Graph API. 

152 If the authentication code is provided, the token is acquired by authorization code. 

153 Otherwise, the authorization request URL is returned. 

154 

155 Raises: 

156 AuthException: If the authentication code is not provided 

157 

158 Returns: 

159 Dict: The response from the Microsoft Graph API authentication flow. 

160 """ 

161 msal_app = self._get_msal_app() 

162 

163 # If the authentication code is provided, acquire the token by authorization code. 

164 if self.code: 

165 response = msal_app.acquire_token_by_authorization_code( 

166 code=self.code, 

167 scopes=self.scopes, 

168 redirect_uri=self.redirect_uri 

169 ) 

170 

171 return response 

172 

173 # If the authentication code is not provided, get the authorization request URL. 

174 else: 

175 auth_url = msal_app.get_authorization_request_url( 

176 scopes=self.scopes, 

177 redirect_uri=self.redirect_uri 

178 ) 

179 

180 raise AuthException(f'Authorisation required. Please follow the url: {auth_url}', auth_url=auth_url) 

181 

182 

183class MSGraphAPIApplicationPermissionsManager(MSGraphAPIPermissionsManager): 

184 """ 

185 The class for managing application permissions for the Microsoft Graph API. 

186 """ 

187 

188 def get_access_token(self) -> Text: 

189 """ 

190 Retrieves an access token for the Microsoft Graph API using the client credentials flow. 

191 

192 Returns: 

193 Text: The access token for the Microsoft Graph API. 

194 """ 

195 msal_app = self._get_msal_app() 

196 

197 # Check if a valid access token is already in the cache. 

198 accounts = msal_app.get_accounts() 

199 if accounts: 

200 response = msal_app.acquire_token_silent(self.scopes, account=accounts[0]) 

201 if "access_token" in response: 

202 return response["access_token"] 

203 

204 # If no valid access token is found in the cache, acquire a new token using client credentials. 

205 response = msal_app.acquire_token_for_client(scopes=self.scopes) 

206 

207 if "access_token" in response: 

208 return response["access_token"] 

209 else: 

210 raise AuthException( 

211 f"Error getting access token: {response.get('error_description')}" 

212 )