Coverage for mindsdb / integrations / utilities / handlers / auth_utilities / google / google_user_oauth_utilities.py: 22%
59 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import json
2from pathlib import Path
3import requests
4import datetime as dt
5from flask import request
7from mindsdb.utilities import log
9from ..exceptions import AuthException
11from google_auth_oauthlib.flow import Flow
13from google.oauth2.credentials import Credentials
14from google.auth.transport.requests import Request
16logger = log.getLogger(__name__)
19class GoogleUserOAuth2Manager:
20 def __init__(self, handler_stroage: str, scopes: list, credentials_file: str = None, credentials_url: str = None, code: str = None):
21 self.handler_storage = handler_stroage
22 self.scopes = scopes
23 self.credentials_file = credentials_file
24 self.credentials_url = credentials_url
25 self.code = code
27 def get_oauth2_credentials(self):
28 creds = None
30 if self.credentials_file or self.credentials_url:
31 oauth_user_info = self.handler_storage.encrypted_json_get('oauth_user_info')
33 if oauth_user_info:
34 creds = Credentials.from_authorized_user_info(oauth_user_info, self.scopes)
36 if not creds or not creds.valid:
37 logger.debug("Credentials do not exist or are invalid, attempting to authorize again")
39 oauth_user_info = self._download_oauth_user_info()
41 if creds and creds.expired and creds.refresh_token:
42 creds.refresh(Request())
43 logger.debug("Credentials refreshed successfully")
44 else:
45 creds = self._execute_google_auth_flow(oauth_user_info)
46 logger.debug("New credentials obtained")
48 self.handler_storage.encrypted_json_set('oauth_user_info', self._convert_credentials_to_dict(creds))
49 logger.debug("Saving credentials to storage")
51 return creds
53 def _download_oauth_user_info(self):
54 # if credentials_url is set, attempt to download the contents of the files
55 # this will be given preference over credentials_file
56 if self.credentials_url:
57 response = requests.get(self.credentials_url)
58 if response.status_code == 200:
59 return response.json()
60 else:
61 logger.error("Failed to get credentials from URL", response.status_code)
63 # if credentials_file is set, attempt to read the contents of the file
64 if self.credentials_file:
65 path = Path(self.credentials_file).expanduser()
66 if path.exists():
67 with open(path, 'r') as f:
68 return json.load(f)
69 else:
70 logger.error("Credentials file does not exist")
72 raise ValueError('OAuth2 credentials could not be found')
74 def _execute_google_auth_flow(self, oauth_user_info: dict):
75 flow = Flow.from_client_config(
76 oauth_user_info,
77 scopes=self.scopes
78 )
80 flow.redirect_uri = request.headers['ORIGIN'] + '/verify-auth'
82 if self.code:
83 flow.fetch_token(code=self.code)
84 creds = flow.credentials
85 return creds
86 else:
87 auth_url = flow.authorization_url()[0]
88 raise AuthException(f'Authorisation required. Please follow the url: {auth_url}', auth_url=auth_url)
90 def _convert_credentials_to_dict(self, credentials):
91 return {
92 'token': credentials.token,
93 'refresh_token': credentials.refresh_token,
94 'token_uri': credentials.token_uri,
95 'client_id': credentials.client_id,
96 'client_secret': credentials.client_secret,
97 'scopes': credentials.scopes,
98 'expiry': dt.datetime.strftime(credentials.expiry, '%Y-%m-%dT%H:%M:%S')
99 }