Coverage for mindsdb / integrations / utilities / handlers / auth_utilities / google / google_service_account_oauth_utilities.py: 36%
41 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import json
2import requests
3from typing import Union
4from google.oauth2 import service_account
6from mindsdb.utilities import log
8from ..exceptions import NoCredentialsException, AuthException
11logger = log.getLogger(__name__)
14class GoogleServiceAccountOAuth2Manager:
15 def __init__(self, credentials_url: str = None, credentials_file: str = None, credentials_json: Union[dict, str] = None) -> None:
16 # if no credentials provided, raise an exception
17 if not any([credentials_url, credentials_file, credentials_json]): 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true
18 raise NoCredentialsException('No valid Google Service Account credentials provided.')
19 self.credentials_url = credentials_url
20 self.credentials_file = credentials_file
21 if credentials_json: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true
22 self.credentials_json = self._parse_credentials_json(credentials_json)
23 else:
24 self.credentials_json = None
26 def get_oauth2_credentials(self):
27 try:
28 if self.credentials_url:
29 creds = service_account.Credentials.from_service_account_info(self._download_credentials_file())
30 return creds
32 if self.credentials_file:
33 creds = service_account.Credentials.from_service_account_file(self.credentials_file)
34 return creds
36 if self.credentials_json:
37 creds = service_account.Credentials.from_service_account_info(self.credentials_json)
38 return creds
39 except Exception as e:
40 raise AuthException(f"Authentication failed: {e}")
42 def _download_credentials_file(self):
43 response = requests.get(self.credentials_url)
44 # raise a HTTPError if the status is 4xx or 5xx
45 response.raise_for_status()
47 return self._parse_credentials_json(response.json())
49 def _parse_credentials_json(self, credentials_json: str) -> dict:
50 if isinstance(credentials_json, str):
51 try:
52 # attempt to convert to JSON
53 return json.loads(credentials_json)
54 except json.JSONDecodeError:
55 raise ValueError("Failed to parse credentials provided. Please provide a valid service account key.")
56 else:
57 # unescape new lines in private_key
58 credentials_json['private_key'] = credentials_json['private_key'].replace('\\n', '\n')
59 return credentials_json