Coverage for mindsdb / integrations / utilities / handlers / auth_utilities / google / google_service_account_oauth_utilities.py: 36%

41 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import json 

2import requests 

3from typing import Union 

4from google.oauth2 import service_account 

5 

6from mindsdb.utilities import log 

7 

8from ..exceptions import NoCredentialsException, AuthException 

9 

10 

11logger = log.getLogger(__name__) 

12 

13 

14class GoogleServiceAccountOAuth2Manager: 

15 def __init__(self, credentials_url: str = None, credentials_file: str = None, credentials_json: Union[dict, str] = None) -> None: 

16 # if no credentials provided, raise an exception 

17 if not any([credentials_url, credentials_file, credentials_json]): 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true

18 raise NoCredentialsException('No valid Google Service Account credentials provided.') 

19 self.credentials_url = credentials_url 

20 self.credentials_file = credentials_file 

21 if credentials_json: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true

22 self.credentials_json = self._parse_credentials_json(credentials_json) 

23 else: 

24 self.credentials_json = None 

25 

26 def get_oauth2_credentials(self): 

27 try: 

28 if self.credentials_url: 

29 creds = service_account.Credentials.from_service_account_info(self._download_credentials_file()) 

30 return creds 

31 

32 if self.credentials_file: 

33 creds = service_account.Credentials.from_service_account_file(self.credentials_file) 

34 return creds 

35 

36 if self.credentials_json: 

37 creds = service_account.Credentials.from_service_account_info(self.credentials_json) 

38 return creds 

39 except Exception as e: 

40 raise AuthException(f"Authentication failed: {e}") 

41 

42 def _download_credentials_file(self): 

43 response = requests.get(self.credentials_url) 

44 # raise a HTTPError if the status is 4xx or 5xx 

45 response.raise_for_status() 

46 

47 return self._parse_credentials_json(response.json()) 

48 

49 def _parse_credentials_json(self, credentials_json: str) -> dict: 

50 if isinstance(credentials_json, str): 

51 try: 

52 # attempt to convert to JSON 

53 return json.loads(credentials_json) 

54 except json.JSONDecodeError: 

55 raise ValueError("Failed to parse credentials provided. Please provide a valid service account key.") 

56 else: 

57 # unescape new lines in private_key 

58 credentials_json['private_key'] = credentials_json['private_key'].replace('\\n', '\n') 

59 return credentials_json