Coverage for mindsdb / integrations / handlers / google_analytics_handler / google_analytics_handler.py: 0%
73 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from mindsdb_sql_parser import parse_sql
2from mindsdb.integrations.libs.api_handler import APIHandler
3from mindsdb.utilities import log
4from mindsdb.integrations.handlers.google_analytics_handler.google_analytics_tables import ConversionEventsTable
5from mindsdb.integrations.libs.response import (
6 HandlerStatusResponse as StatusResponse,
7 HandlerResponse as Response,
8)
10import json
11import os
13from google.analytics.admin_v1beta import AnalyticsAdminServiceClient
14from google.oauth2 import service_account
15from google.auth.transport.requests import Request
16from googleapiclient.errors import HttpError
18DEFAULT_SCOPES = ['https://www.googleapis.com/auth/analytics.readonly',
19 'https://www.googleapis.com/auth/analytics.edit',
20 'https://www.googleapis.com/auth/analytics'
21 ]
23logger = log.getLogger(__name__)
26class GoogleAnalyticsHandler(APIHandler):
27 """A class for handling connections and interactions with the Google Analytics Admin API.
29 Attributes:
30 credentials_file (str): The path to the Google Auth Credentials file for authentication
31 and interacting with the Google Analytics API on behalf of the user.
33 Scopes (List[str], Optional): The scopes to use when authenticating with the Google Analytics API.
34 """
36 name = 'google_analytics'
38 def __init__(self, name: str, **kwargs):
39 super().__init__(name)
40 self.page_size = 500
41 self.connection_args = kwargs.get('connection_data', {})
42 self.property_id = self.connection_args['property_id']
43 if self.connection_args.get('credentials'):
44 self.credentials_file = self.connection_args.pop('credentials')
46 self.scopes = self.connection_args.get('scopes', DEFAULT_SCOPES)
47 self.service = None
48 self.is_connected = False
49 conversion_events = ConversionEventsTable(self)
50 self.conversion_events = conversion_events
51 self._register_table('conversion_events', conversion_events)
53 def _get_creds_json(self):
54 if 'credentials_file' in self.connection_args:
55 if os.path.isfile(self.connection_args['credentials_file']) is False:
56 raise Exception("credentials_file must be a file path")
57 with open(self.connection_args['credentials_file']) as source:
58 info = json.load(source)
59 return info
60 elif 'credentials_json' in self.connection_args:
61 info = self.connection_args['credentials_json']
62 if not isinstance(info, dict):
63 raise Exception("credentials_json has to be dict")
64 info['private_key'] = info['private_key'].replace('\\n', '\n')
65 return info
66 else:
67 raise Exception('Connection args have to content ether credentials_file or credentials_json')
69 def create_connection(self):
70 info = self._get_creds_json()
71 creds = service_account.Credentials.from_service_account_info(info=info, scopes=self.scopes)
73 if not creds or not creds.valid:
74 if creds and creds.expired and creds.refresh_token:
75 creds.refresh(Request())
77 return AnalyticsAdminServiceClient(credentials=creds)
79 def connect(self):
80 """
81 Authenticate with the Google Analytics Admin API using the credential file.
83 Returns
84 -------
85 service: object
86 The authenticated Google Analytics Admin API service object.
87 """
88 if self.is_connected is True:
89 return self.service
91 self.service = self.create_connection()
92 self.is_connected = True
94 return self.service
96 def check_connection(self) -> StatusResponse:
97 """
98 Check connection to the handler.
100 Returns
101 -------
102 response
103 Status confirmation
104 """
105 response = StatusResponse(False)
107 try:
108 # Call the Google Analytics API
109 service = self.connect()
110 result = service.list_conversion_events(parent=f'properties/{self.property_id}')
112 if result is not None:
113 response.success = True
114 except HttpError as error:
115 response.error_message = f'Error connecting to Google Analytics api: {error}.'
116 log.logger.error(response.error_message)
118 if response.success is False and self.is_connected is True:
119 self.is_connected = False
121 return response
123 def native_query(self, query_string: str = None) -> Response:
124 ast = parse_sql(query_string)
126 return self.query(ast)
128 def get_api_url(self, endpoint):
129 return f'{endpoint}/{self.property_id}'