Coverage for mindsdb / integrations / handlers / google_analytics_handler / google_analytics_tables.py: 0%

125 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2from typing import List 

3 

4from google.analytics.admin_v1beta import ListConversionEventsRequest, ConversionEvent, CreateConversionEventRequest, \ 

5 UpdateConversionEventRequest, DeleteConversionEventRequest 

6from mindsdb_sql_parser import Constant 

7from mindsdb_sql_parser import ast 

8from mindsdb.integrations.libs.api_handler import APITable 

9from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions 

10 

11 

12class ConversionEventsTable(APITable): 

13 

14 def select(self, query: ast.Select) -> pd.DataFrame: 

15 """ 

16 Gets all conversion events from google analytics property. 

17 

18 Args: 

19 query (ast.Select): SQL query to parse. 

20 

21 Returns: 

22 Response: Response object containing the results. 

23 """ 

24 # Parse the query to get the conditions. 

25 conditions = extract_comparison_conditions(query.where) 

26 # Get the page size from the conditions. 

27 params = {} 

28 for op, arg1, arg2 in conditions: 

29 if arg1 == 'page_size': 

30 params[arg1] = arg2 

31 else: 

32 raise NotImplementedError 

33 

34 # Get the order by from the query. 

35 if query.order_by is not None: 

36 raise NotImplementedError 

37 

38 if query.limit is not None: 

39 raise NotImplementedError 

40 

41 # Get the conversion events from the Google Analytics Admin API. 

42 conversion_events = pd.DataFrame(columns=self.get_columns()) 

43 result = self.get_conversion_events(params=params) 

44 conversion_events_data = self.extract_conversion_events_data(result.conversion_events) 

45 events = self.concat_dataframes(conversion_events, conversion_events_data) 

46 

47 selected_columns = [] 

48 for target in query.targets: 

49 if isinstance(target, ast.Star): 

50 selected_columns = self.get_columns() 

51 break 

52 elif isinstance(target, ast.Identifier): 

53 selected_columns.append(target.parts[-1]) 

54 else: 

55 raise ValueError(f"Unknown query target {type(target)}") 

56 

57 if len(events) == 0: 

58 events = pd.DataFrame([], columns=selected_columns) 

59 else: 

60 events.columns = self.get_columns() 

61 for col in set(events.columns).difference(set(selected_columns)): 

62 events = events.drop(col, axis=1) 

63 return events 

64 

65 def insert(self, query: ast.Insert): 

66 """ 

67 Inserts a conversion event into your GA4 property. 

68 

69 Args: 

70 query (ast.Insert): SQL query to parse. 

71 """ 

72 columns = [col.name for col in query.columns] 

73 

74 supported_columns = {'event_name', 'countingMethod'} 

75 if not set(columns).issubset(supported_columns): 

76 unsupported_columns = set(columns).difference(supported_columns) 

77 raise ValueError( 

78 "Unsupported columns for create conversion event: " 

79 + ", ".join(unsupported_columns) 

80 ) 

81 params = {} 

82 

83 for row in query.values: 

84 params = dict(zip(columns, row)) 

85 

86 # get params values of a type <Constant> 

87 if isinstance(params['countingMethod'], str): 

88 params['countingMethod'] = int(params['countingMethod']) 

89 elif isinstance(params['countingMethod'], Constant): 

90 params['countingMethod'] = params['countingMethod'].value 

91 else: 

92 params['countingMethod'] = params['countingMethod'] 

93 

94 if isinstance(params['event_name'], Constant): 

95 params['event_name'] = params['event_name'].value 

96 else: 

97 params['event_name'] = params['event_name'] 

98 

99 # Insert the conversion event into the Google Analytics Admin API. 

100 conversion_events = pd.DataFrame(columns=self.get_columns()) 

101 result = self.create_conversion_event(params=params) 

102 conversion_events_data = self.extract_conversion_events_data([result]) 

103 self.concat_dataframes(conversion_events, conversion_events_data) 

104 

105 def update(self, query: ast.Update): 

106 """ 

107 Updates a conversion event into your GA4 property. 

108 

109 Args: 

110 query (ast.Update): SQL query to parse. 

111 """ 

112 # Get the values from the query. 

113 values = query.update_columns.items() 

114 data_list = list(values) 

115 # Get the conversion event data from the values. 

116 params = {} 

117 for col, val in zip(query.update_columns, data_list): 

118 params[col] = val 

119 

120 conditions = extract_comparison_conditions(query.where) 

121 for op, arg1, arg2 in conditions: 

122 if arg1 == 'name': 

123 if op == '=': 

124 params['name'] = arg2 

125 else: 

126 raise NotImplementedError 

127 else: 

128 raise NotImplementedError 

129 

130 # get params values of a type <Constant> 

131 params['countingMethod'] = params['countingMethod'][1].value 

132 

133 # Update the conversion event in the Google Analytics Admin API. 

134 conversion_events = pd.DataFrame(columns=self.get_columns()) 

135 result = self.update_conversion_event(params=params) 

136 conversion_events_data = self.extract_conversion_events_data([result]) 

137 self.concat_dataframes(conversion_events, conversion_events_data) 

138 

139 def delete(self, query: ast.Delete): 

140 """ 

141 Deletes a conversion event into your GA4 property. 

142 

143 Args: 

144 query (ast.Delete): SQL query to parse. 

145 """ 

146 

147 # Parse the query to get the conditions. 

148 conditions = extract_comparison_conditions(query.where) 

149 for op, arg1, arg2 in conditions: 

150 if op == 'or': 

151 raise NotImplementedError('OR is not supported') 

152 if arg1 == 'name': 

153 if op == '=': 

154 self.delete_conversion_event(params={'name': arg2}) 

155 else: 

156 raise NotImplementedError(f'Unknown op: {op}') 

157 else: 

158 raise NotImplementedError(f'Unknown clause: {arg1}') 

159 

160 def get_conversion_events(self, params: dict = None): 

161 """ 

162 List all conversion events in your GA4 property 

163 Args: 

164 params (dict): query parameters 

165 Returns: 

166 ConversionEvent objects 

167 """ 

168 service = self.handler.connect() 

169 page_token = None 

170 url = self.handler.get_api_url('properties') 

171 

172 while True: 

173 request = ListConversionEventsRequest(parent=url, 

174 page_token=page_token, **params) 

175 result = service.list_conversion_events(request) 

176 

177 page_token = result.next_page_token 

178 if not page_token: 

179 break 

180 return result 

181 

182 def create_conversion_event(self, params: dict = None): 

183 """ 

184 Create a conversion event in your property. 

185 Args: 

186 params (dict): query parameters 

187 Returns: 

188 ConversionEvent object 

189 """ 

190 service = self.handler.connect() 

191 url = self.handler.get_api_url('properties') 

192 

193 conversion_event = ConversionEvent( 

194 event_name=params['event_name'], 

195 counting_method=params['countingMethod'] 

196 ) 

197 request = CreateConversionEventRequest(conversion_event=conversion_event, 

198 parent=url) 

199 result = service.create_conversion_event(request) 

200 

201 return result 

202 

203 def update_conversion_event(self, params: dict = None): 

204 """ 

205 Update a conversion event in your property. 

206 Args: 

207 params (dict): query parameters 

208 Returns: 

209 ConversionEvent object 

210 """ 

211 service = self.handler.connect() 

212 

213 conversion_event = ConversionEvent( 

214 name=params['name'], 

215 counting_method=params['countingMethod'] 

216 ) 

217 request = UpdateConversionEventRequest(conversion_event=conversion_event, 

218 update_mask='*') 

219 result = service.update_conversion_event(request) 

220 

221 return result 

222 

223 def delete_conversion_event(self, params: dict = None): 

224 """ 

225 Delete a conversion event in your property. 

226 Args: 

227 params (dict): query parameters 

228 """ 

229 service = self.handler.connect() 

230 request = DeleteConversionEventRequest(name=params['name']) 

231 service.delete_conversion_event(request) 

232 

233 @staticmethod 

234 def extract_conversion_events_data(conversion_events): 

235 """ 

236 Extract conversion events data and return a list of lists. 

237 Args: 

238 conversion_events: List of ConversionEvent objects 

239 Returns: 

240 List of lists containing conversion event data 

241 """ 

242 conversion_events_data = [] 

243 for conversion_event in conversion_events: 

244 data_row = [ 

245 conversion_event.name, 

246 conversion_event.event_name, 

247 conversion_event.create_time, 

248 conversion_event.deletable, 

249 conversion_event.custom, 

250 conversion_event.ConversionCountingMethod(conversion_event.counting_method).name, 

251 ] 

252 conversion_events_data.append(data_row) 

253 return conversion_events_data 

254 

255 def concat_dataframes(self, existing_df, data): 

256 """ 

257 Concatenate existing DataFrame with new data. 

258 Args: 

259 existing_df: Existing DataFrame 

260 data: New data to be added to the DataFrame 

261 Returns: 

262 Concatenated DataFrame 

263 """ 

264 return pd.concat( 

265 [existing_df, pd.DataFrame(data, columns=self.get_columns())], 

266 ignore_index=True 

267 ) 

268 

269 def get_columns(self) -> List[str]: 

270 """ 

271 Gets all columns to be returned in pandas DataFrame responses 

272 

273 Returns: 

274 List[str]: List of columns 

275 """ 

276 return [ 

277 'name', 

278 'event_name', 

279 'create_time', 

280 'deletable', 

281 'custom', 

282 'countingMethod', 

283 ]