Coverage for mindsdb / integrations / handlers / google_analytics_handler / google_analytics_handler.py: 0%

73 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb_sql_parser import parse_sql 

2from mindsdb.integrations.libs.api_handler import APIHandler 

3from mindsdb.utilities import log 

4from mindsdb.integrations.handlers.google_analytics_handler.google_analytics_tables import ConversionEventsTable 

5from mindsdb.integrations.libs.response import ( 

6 HandlerStatusResponse as StatusResponse, 

7 HandlerResponse as Response, 

8) 

9 

10import json 

11import os 

12 

13from google.analytics.admin_v1beta import AnalyticsAdminServiceClient 

14from google.oauth2 import service_account 

15from google.auth.transport.requests import Request 

16from googleapiclient.errors import HttpError 

17 

18DEFAULT_SCOPES = ['https://www.googleapis.com/auth/analytics.readonly', 

19 'https://www.googleapis.com/auth/analytics.edit', 

20 'https://www.googleapis.com/auth/analytics' 

21 ] 

22 

23logger = log.getLogger(__name__) 

24 

25 

26class GoogleAnalyticsHandler(APIHandler): 

27 """A class for handling connections and interactions with the Google Analytics Admin API. 

28 

29 Attributes: 

30 credentials_file (str): The path to the Google Auth Credentials file for authentication 

31 and interacting with the Google Analytics API on behalf of the user. 

32 

33 Scopes (List[str], Optional): The scopes to use when authenticating with the Google Analytics API. 

34 """ 

35 

36 name = 'google_analytics' 

37 

38 def __init__(self, name: str, **kwargs): 

39 super().__init__(name) 

40 self.page_size = 500 

41 self.connection_args = kwargs.get('connection_data', {}) 

42 self.property_id = self.connection_args['property_id'] 

43 if self.connection_args.get('credentials'): 

44 self.credentials_file = self.connection_args.pop('credentials') 

45 

46 self.scopes = self.connection_args.get('scopes', DEFAULT_SCOPES) 

47 self.service = None 

48 self.is_connected = False 

49 conversion_events = ConversionEventsTable(self) 

50 self.conversion_events = conversion_events 

51 self._register_table('conversion_events', conversion_events) 

52 

53 def _get_creds_json(self): 

54 if 'credentials_file' in self.connection_args: 

55 if os.path.isfile(self.connection_args['credentials_file']) is False: 

56 raise Exception("credentials_file must be a file path") 

57 with open(self.connection_args['credentials_file']) as source: 

58 info = json.load(source) 

59 return info 

60 elif 'credentials_json' in self.connection_args: 

61 info = self.connection_args['credentials_json'] 

62 if not isinstance(info, dict): 

63 raise Exception("credentials_json has to be dict") 

64 info['private_key'] = info['private_key'].replace('\\n', '\n') 

65 return info 

66 else: 

67 raise Exception('Connection args have to content ether credentials_file or credentials_json') 

68 

69 def create_connection(self): 

70 info = self._get_creds_json() 

71 creds = service_account.Credentials.from_service_account_info(info=info, scopes=self.scopes) 

72 

73 if not creds or not creds.valid: 

74 if creds and creds.expired and creds.refresh_token: 

75 creds.refresh(Request()) 

76 

77 return AnalyticsAdminServiceClient(credentials=creds) 

78 

79 def connect(self): 

80 """ 

81 Authenticate with the Google Analytics Admin API using the credential file. 

82 

83 Returns 

84 ------- 

85 service: object 

86 The authenticated Google Analytics Admin API service object. 

87 """ 

88 if self.is_connected is True: 

89 return self.service 

90 

91 self.service = self.create_connection() 

92 self.is_connected = True 

93 

94 return self.service 

95 

96 def check_connection(self) -> StatusResponse: 

97 """ 

98 Check connection to the handler. 

99 

100 Returns 

101 ------- 

102 response 

103 Status confirmation 

104 """ 

105 response = StatusResponse(False) 

106 

107 try: 

108 # Call the Google Analytics API 

109 service = self.connect() 

110 result = service.list_conversion_events(parent=f'properties/{self.property_id}') 

111 

112 if result is not None: 

113 response.success = True 

114 except HttpError as error: 

115 response.error_message = f'Error connecting to Google Analytics api: {error}.' 

116 log.logger.error(response.error_message) 

117 

118 if response.success is False and self.is_connected is True: 

119 self.is_connected = False 

120 

121 return response 

122 

123 def native_query(self, query_string: str = None) -> Response: 

124 ast = parse_sql(query_string) 

125 

126 return self.query(ast) 

127 

128 def get_api_url(self, endpoint): 

129 return f'{endpoint}/{self.property_id}'