Coverage for mindsdb / integrations / handlers / google_analytics_handler / google_analytics_tables.py: 0%
125 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pandas as pd
2from typing import List
4from google.analytics.admin_v1beta import ListConversionEventsRequest, ConversionEvent, CreateConversionEventRequest, \
5 UpdateConversionEventRequest, DeleteConversionEventRequest
6from mindsdb_sql_parser import Constant
7from mindsdb_sql_parser import ast
8from mindsdb.integrations.libs.api_handler import APITable
9from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions
12class ConversionEventsTable(APITable):
14 def select(self, query: ast.Select) -> pd.DataFrame:
15 """
16 Gets all conversion events from google analytics property.
18 Args:
19 query (ast.Select): SQL query to parse.
21 Returns:
22 Response: Response object containing the results.
23 """
24 # Parse the query to get the conditions.
25 conditions = extract_comparison_conditions(query.where)
26 # Get the page size from the conditions.
27 params = {}
28 for op, arg1, arg2 in conditions:
29 if arg1 == 'page_size':
30 params[arg1] = arg2
31 else:
32 raise NotImplementedError
34 # Get the order by from the query.
35 if query.order_by is not None:
36 raise NotImplementedError
38 if query.limit is not None:
39 raise NotImplementedError
41 # Get the conversion events from the Google Analytics Admin API.
42 conversion_events = pd.DataFrame(columns=self.get_columns())
43 result = self.get_conversion_events(params=params)
44 conversion_events_data = self.extract_conversion_events_data(result.conversion_events)
45 events = self.concat_dataframes(conversion_events, conversion_events_data)
47 selected_columns = []
48 for target in query.targets:
49 if isinstance(target, ast.Star):
50 selected_columns = self.get_columns()
51 break
52 elif isinstance(target, ast.Identifier):
53 selected_columns.append(target.parts[-1])
54 else:
55 raise ValueError(f"Unknown query target {type(target)}")
57 if len(events) == 0:
58 events = pd.DataFrame([], columns=selected_columns)
59 else:
60 events.columns = self.get_columns()
61 for col in set(events.columns).difference(set(selected_columns)):
62 events = events.drop(col, axis=1)
63 return events
65 def insert(self, query: ast.Insert):
66 """
67 Inserts a conversion event into your GA4 property.
69 Args:
70 query (ast.Insert): SQL query to parse.
71 """
72 columns = [col.name for col in query.columns]
74 supported_columns = {'event_name', 'countingMethod'}
75 if not set(columns).issubset(supported_columns):
76 unsupported_columns = set(columns).difference(supported_columns)
77 raise ValueError(
78 "Unsupported columns for create conversion event: "
79 + ", ".join(unsupported_columns)
80 )
81 params = {}
83 for row in query.values:
84 params = dict(zip(columns, row))
86 # get params values of a type <Constant>
87 if isinstance(params['countingMethod'], str):
88 params['countingMethod'] = int(params['countingMethod'])
89 elif isinstance(params['countingMethod'], Constant):
90 params['countingMethod'] = params['countingMethod'].value
91 else:
92 params['countingMethod'] = params['countingMethod']
94 if isinstance(params['event_name'], Constant):
95 params['event_name'] = params['event_name'].value
96 else:
97 params['event_name'] = params['event_name']
99 # Insert the conversion event into the Google Analytics Admin API.
100 conversion_events = pd.DataFrame(columns=self.get_columns())
101 result = self.create_conversion_event(params=params)
102 conversion_events_data = self.extract_conversion_events_data([result])
103 self.concat_dataframes(conversion_events, conversion_events_data)
105 def update(self, query: ast.Update):
106 """
107 Updates a conversion event into your GA4 property.
109 Args:
110 query (ast.Update): SQL query to parse.
111 """
112 # Get the values from the query.
113 values = query.update_columns.items()
114 data_list = list(values)
115 # Get the conversion event data from the values.
116 params = {}
117 for col, val in zip(query.update_columns, data_list):
118 params[col] = val
120 conditions = extract_comparison_conditions(query.where)
121 for op, arg1, arg2 in conditions:
122 if arg1 == 'name':
123 if op == '=':
124 params['name'] = arg2
125 else:
126 raise NotImplementedError
127 else:
128 raise NotImplementedError
130 # get params values of a type <Constant>
131 params['countingMethod'] = params['countingMethod'][1].value
133 # Update the conversion event in the Google Analytics Admin API.
134 conversion_events = pd.DataFrame(columns=self.get_columns())
135 result = self.update_conversion_event(params=params)
136 conversion_events_data = self.extract_conversion_events_data([result])
137 self.concat_dataframes(conversion_events, conversion_events_data)
139 def delete(self, query: ast.Delete):
140 """
141 Deletes a conversion event into your GA4 property.
143 Args:
144 query (ast.Delete): SQL query to parse.
145 """
147 # Parse the query to get the conditions.
148 conditions = extract_comparison_conditions(query.where)
149 for op, arg1, arg2 in conditions:
150 if op == 'or':
151 raise NotImplementedError('OR is not supported')
152 if arg1 == 'name':
153 if op == '=':
154 self.delete_conversion_event(params={'name': arg2})
155 else:
156 raise NotImplementedError(f'Unknown op: {op}')
157 else:
158 raise NotImplementedError(f'Unknown clause: {arg1}')
160 def get_conversion_events(self, params: dict = None):
161 """
162 List all conversion events in your GA4 property
163 Args:
164 params (dict): query parameters
165 Returns:
166 ConversionEvent objects
167 """
168 service = self.handler.connect()
169 page_token = None
170 url = self.handler.get_api_url('properties')
172 while True:
173 request = ListConversionEventsRequest(parent=url,
174 page_token=page_token, **params)
175 result = service.list_conversion_events(request)
177 page_token = result.next_page_token
178 if not page_token:
179 break
180 return result
182 def create_conversion_event(self, params: dict = None):
183 """
184 Create a conversion event in your property.
185 Args:
186 params (dict): query parameters
187 Returns:
188 ConversionEvent object
189 """
190 service = self.handler.connect()
191 url = self.handler.get_api_url('properties')
193 conversion_event = ConversionEvent(
194 event_name=params['event_name'],
195 counting_method=params['countingMethod']
196 )
197 request = CreateConversionEventRequest(conversion_event=conversion_event,
198 parent=url)
199 result = service.create_conversion_event(request)
201 return result
203 def update_conversion_event(self, params: dict = None):
204 """
205 Update a conversion event in your property.
206 Args:
207 params (dict): query parameters
208 Returns:
209 ConversionEvent object
210 """
211 service = self.handler.connect()
213 conversion_event = ConversionEvent(
214 name=params['name'],
215 counting_method=params['countingMethod']
216 )
217 request = UpdateConversionEventRequest(conversion_event=conversion_event,
218 update_mask='*')
219 result = service.update_conversion_event(request)
221 return result
223 def delete_conversion_event(self, params: dict = None):
224 """
225 Delete a conversion event in your property.
226 Args:
227 params (dict): query parameters
228 """
229 service = self.handler.connect()
230 request = DeleteConversionEventRequest(name=params['name'])
231 service.delete_conversion_event(request)
233 @staticmethod
234 def extract_conversion_events_data(conversion_events):
235 """
236 Extract conversion events data and return a list of lists.
237 Args:
238 conversion_events: List of ConversionEvent objects
239 Returns:
240 List of lists containing conversion event data
241 """
242 conversion_events_data = []
243 for conversion_event in conversion_events:
244 data_row = [
245 conversion_event.name,
246 conversion_event.event_name,
247 conversion_event.create_time,
248 conversion_event.deletable,
249 conversion_event.custom,
250 conversion_event.ConversionCountingMethod(conversion_event.counting_method).name,
251 ]
252 conversion_events_data.append(data_row)
253 return conversion_events_data
255 def concat_dataframes(self, existing_df, data):
256 """
257 Concatenate existing DataFrame with new data.
258 Args:
259 existing_df: Existing DataFrame
260 data: New data to be added to the DataFrame
261 Returns:
262 Concatenated DataFrame
263 """
264 return pd.concat(
265 [existing_df, pd.DataFrame(data, columns=self.get_columns())],
266 ignore_index=True
267 )
269 def get_columns(self) -> List[str]:
270 """
271 Gets all columns to be returned in pandas DataFrame responses
273 Returns:
274 List[str]: List of columns
275 """
276 return [
277 'name',
278 'event_name',
279 'create_time',
280 'deletable',
281 'custom',
282 'countingMethod',
283 ]