Coverage for mindsdb / integrations / handlers / google_content_shopping_handler / google_content_shopping_handler.py: 0%

231 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import json 

2 

3import pandas as pd 

4from pandas import DataFrame 

5from google.auth.transport.requests import Request 

6from google.oauth2 import service_account 

7from google.oauth2.credentials import Credentials 

8from googleapiclient.discovery import build 

9from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE 

10from .google_content_shopping_tables import AccountsTable, OrdersTable, ProductsTable 

11from mindsdb.integrations.libs.api_handler import APIHandler, FuncParser 

12from mindsdb.integrations.libs.response import ( 

13 HandlerStatusResponse as StatusResponse, 

14 HandlerResponse as Response, 

15) 

16from mindsdb.utilities import log 

17 

18logger = log.getLogger(__name__) 

19 

20 

21class GoogleContentShoppingHandler(APIHandler): 

22 """ 

23 A class for handling connections and interactions with the Google Content API for Shopping. 

24 """ 

25 

26 name = "google_content_shopping" 

27 

28 def __init__(self, name: str, **kwargs): 

29 """ 

30 Initialize the Google Content API for Shopping handler. 

31 Args: 

32 name (str): name of the handler 

33 kwargs (dict): additional arguments 

34 """ 

35 super().__init__(name) 

36 self.token = None 

37 self.service = None 

38 self.connection_data = kwargs.get("connection_data", {}) 

39 self.fs_storage = kwargs["file_storage"] 

40 self.credentials_file = self.connection_data.get("credentials", None) 

41 self.merchant_id = self.connection_data.get("merchant_id", None) 

42 self.credentials = None 

43 self.scopes = ["https://www.googleapis.com/auth/content"] 

44 self.is_connected = False 

45 accounts = AccountsTable(self) 

46 self.accounts = accounts 

47 self._register_table("Accounts", accounts) 

48 orders = OrdersTable(self) 

49 self.orders = orders 

50 self._register_table("Orders", orders) 

51 products = ProductsTable(self) 

52 self.products = products 

53 self._register_table("Products", products) 

54 

55 def connect(self): 

56 """ 

57 Set up any connections required by the handler 

58 Should return output of check_connection() method after attempting 

59 connection. Should switch self.is_connected. 

60 Returns: 

61 HandlerStatusResponse 

62 """ 

63 if self.is_connected is True: 

64 return self.service 

65 if self.credentials_file: 

66 try: 

67 json_str_bytes = self.fs_storage.file_get("token_content.json") 

68 json_str = json_str_bytes.decode() 

69 self.credentials = Credentials.from_authorized_user_info(info=json.loads(json_str), scopes=self.scopes) 

70 except Exception: 

71 self.credentials = None 

72 if not self.credentials or not self.credentials.valid: 

73 if self.credentials and self.credentials.expired and self.credentials.refresh_token: 

74 self.credentials.refresh(Request()) 

75 else: 

76 self.credentials = service_account.Credentials.from_service_account_file( 

77 self.credentials_file, scopes=self.scopes 

78 ) 

79 # Save the credentials for the next run 

80 json_str = self.credentials.to_json() 

81 self.fs_storage.file_set("token_content.json", json_str.encode()) 

82 self.service = build("content", "v2.1", credentials=self.credentials) 

83 return self.service 

84 

85 def check_connection(self) -> StatusResponse: 

86 """ 

87 Check connection to the handler 

88 Returns: 

89 HandlerStatusResponse 

90 """ 

91 response = StatusResponse(False) 

92 

93 try: 

94 self.connect() 

95 response.success = True 

96 except Exception as e: 

97 logger.error(f"Error connecting to Google Content API for Shopping: {e}!") 

98 response.error_message = e 

99 

100 self.is_connected = response.success 

101 return response 

102 

103 def native_query(self, query: str = None) -> Response: 

104 """ 

105 Receive raw query and act upon it somehow. 

106 Args: 

107 query (Any): query in native format (str for sql databases, 

108 api's json etc) 

109 Returns: 

110 HandlerResponse 

111 """ 

112 method_name, params = FuncParser().from_string(query) 

113 

114 df = self.call_application_api(method_name, params) 

115 

116 return Response(RESPONSE_TYPE.TABLE, data_frame=df) 

117 

118 def get_accounts(self, params: dict = None) -> DataFrame: 

119 """ 

120 Get accounts 

121 Args: 

122 params (dict): query parameters 

123 Returns: 

124 DataFrame 

125 """ 

126 service = self.connect() 

127 page_token = None 

128 accounts = pd.DataFrame(columns=self.accounts.get_columns()) 

129 if params["account_id"]: 

130 result = service.accounts().get(merchantId=self.merchant_id, accountId=params["account_id"]).execute() 

131 accounts = pd.DataFrame(result, columns=self.accounts.get_columns()) 

132 return accounts 

133 while True: 

134 result = service.accounts().list(merchantId=self.merchant_id, page_token=page_token, **params).execute() 

135 accounts = pd.concat( 

136 [accounts, pd.DataFrame(result["resources"], columns=self.accounts.get_columns())], ignore_index=True 

137 ) 

138 page_token = result.get("nextPageToken") 

139 if not page_token: 

140 break 

141 

142 if params["startId"] and params["endId"]: 

143 start_id = int(params["startId"]) 

144 end_id = int(params["endId"]) 

145 elif params["startId"]: 

146 start_id = int(params["startId"]) 

147 end_id = start_id + 10 

148 elif params["endId"]: 

149 end_id = int(params["endId"]) 

150 start_id = end_id - 10 

151 else: 

152 raise Exception("startId or endId must be specified") 

153 

154 accounts = accounts.drop(accounts[(accounts["id"] < start_id) | (accounts["id"] > end_id)].index) 

155 

156 return accounts 

157 

158 def delete_accounts(self, params: dict = None) -> DataFrame: 

159 """ 

160 Delete accounts 

161 Args: 

162 params (dict): query parameters 

163 Returns: 

164 DataFrame 

165 """ 

166 service = self.connect() 

167 args = {} 

168 if params["force"]: 

169 args = {"force": params["force"]} 

170 if params["accountId"]: 

171 result = ( 

172 service.accounts().delete(merchantId=self.merchant_id, accountId=params["accountId"], **args).execute() 

173 ) 

174 return result 

175 else: 

176 df = pd.DataFrame(columns=["accountId", "status"]) 

177 if not params["startId"]: 

178 start_id = int(params["endId"]) - 10 

179 elif not params["endId"]: 

180 end_id = int(params["startId"]) + 10 

181 else: 

182 start_id = int(params["startId"]) 

183 end_id = int(params["endId"]) 

184 

185 for i in range(start_id, end_id): 

186 service.accounts().delete(merchantId=self.merchant_id, accountId=i, **args).execute() 

187 df = pd.concat([df, pd.DataFrame([{"accountId": str(i), "status": "deleted"}])], ignore_index=True) 

188 return df 

189 

190 def get_orders(self, params: dict = None) -> DataFrame: 

191 """ 

192 Get orders 

193 Args: 

194 params (dict): query parameters 

195 Returns: 

196 DataFrame 

197 """ 

198 service = self.connect() 

199 page_token = None 

200 orders = pd.DataFrame(columns=self.orders.get_columns()) 

201 args = { 

202 key: value 

203 for key, value in params.items() 

204 if key in ["maxResults", "statuses", "acknowledged", "placedDateStart", "placedDateEnd", "orderBy"] 

205 and value is not None 

206 } 

207 if params["order_id"]: 

208 result = service.orders().get(merchantId=self.merchant_id, orderId=params["order_id"], **args).execute() 

209 orders = pd.DataFrame(result, columns=self.orders.get_columns()) 

210 return orders 

211 while True: 

212 result = service.orders().list(merchantId=self.merchant_id, page_token=page_token, **args).execute() 

213 orders = pd.concat( 

214 [orders, pd.DataFrame(result["resources"], columns=self.orders.get_columns())], ignore_index=True 

215 ) 

216 page_token = result.get("nextPageToken") 

217 if not page_token: 

218 break 

219 

220 if params["startId"] and params["endId"]: 

221 start_id = int(params["startId"]) 

222 end_id = int(params["endId"]) 

223 elif params["startId"]: 

224 start_id = int(params["startId"]) 

225 end_id = start_id + 10 

226 elif params["endId"]: 

227 end_id = int(params["endId"]) 

228 start_id = end_id - 10 

229 else: 

230 raise Exception("startId or endId must be specified") 

231 

232 orders = orders.drop(orders[(orders["id"] < start_id) | (orders["id"] > end_id)].index) 

233 

234 return orders 

235 

236 def delete_orders(self, params: dict = None) -> DataFrame: 

237 """ 

238 Delete orders 

239 Args: 

240 params (dict): query parameters 

241 Returns: 

242 DataFrame 

243 """ 

244 service = self.connect() 

245 if params["order_id"]: 

246 result = service.orders().delete(merchantId=self.merchant_id, orderId=params["order_id"]).execute() 

247 return result 

248 else: 

249 df = pd.DataFrame(columns=["orderId", "status"]) 

250 if not params["startId"]: 

251 start_id = int(params["endId"]) - 10 

252 elif not params["endId"]: 

253 end_id = int(params["startId"]) + 10 

254 else: 

255 start_id = int(params["startId"]) 

256 end_id = int(params["endId"]) 

257 

258 for i in range(start_id, end_id): 

259 service.orders().delete(merchantId=self.merchant_id, orderId=i).execute() 

260 df = pd.concat([df, pd.DataFrame([{"orderId": str(i), "status": "deleted"}])], ignore_index=True) 

261 return df 

262 

263 def get_products(self, params: dict = None) -> DataFrame: 

264 """ 

265 Get products 

266 Args: 

267 params (dict): query parameters 

268 Returns: 

269 DataFrame 

270 """ 

271 service = self.connect() 

272 page_token = None 

273 products = pd.DataFrame(columns=self.products.get_columns()) 

274 if params["product_id"]: 

275 result = service.products().get(merchantId=self.merchant_id, productId=params["product_id"]).execute() 

276 products = pd.DataFrame(result, columns=self.products.get_columns()) 

277 return products 

278 while True: 

279 result = service.products().list(merchantId=self.merchant_id, page_token=page_token).execute() 

280 products = pd.concat( 

281 [products, pd.DataFrame(result["resources"], columns=self.products.get_columns())], ignore_index=True 

282 ) 

283 page_token = result.get("nextPageToken") 

284 if not page_token: 

285 break 

286 

287 if params["startId"] and params["endId"]: 

288 start_id = int(params["startId"]) 

289 end_id = int(params["endId"]) 

290 elif params["startId"]: 

291 start_id = int(params["startId"]) 

292 end_id = start_id + 10 

293 elif params["endId"]: 

294 end_id = int(params["endId"]) 

295 start_id = end_id - 10 

296 else: 

297 raise Exception("startId or endId must be specified") 

298 

299 products = products.drop(products[(products["id"] < start_id) | (products["id"] > end_id)].index) 

300 

301 return products 

302 

303 def update_products(self, params: dict = None) -> DataFrame: 

304 """ 

305 Update products 

306 Args: 

307 params (dict): query parameters 

308 Returns: 

309 DataFrame 

310 """ 

311 body = {key: value for key, value in params.items() if key in self.products.get_columns()} 

312 service = self.connect() 

313 if params["product_id"]: 

314 result = ( 

315 service.products() 

316 .update( 

317 merchantId=self.merchant_id, 

318 productId=params["product_id"], 

319 updateMask=params["updateMask"], 

320 body=body, 

321 ) 

322 .execute() 

323 ) 

324 

325 return result 

326 else: 

327 df = pd.DataFrame(columns=["productId", "status"]) 

328 if not params["startId"]: 

329 start_id = int(params["endId"]) - 10 

330 elif not params["endId"]: 

331 end_id = int(params["startId"]) + 10 

332 else: 

333 start_id = int(params["startId"]) 

334 end_id = int(params["endId"]) 

335 

336 for i in range(start_id, end_id): 

337 service.products().update( 

338 merchantId=self.merchant_id, productId=i, updateMask=params["updateMask"], body=body 

339 ).execute() 

340 df = pd.concat([df, pd.DataFrame([{"productId": str(i), "status": "updated"}])], ignore_index=True) 

341 return df 

342 

343 def delete_products(self, params: dict = None) -> DataFrame: 

344 """ 

345 Delete products 

346 Args: 

347 params (dict): query parameters 

348 Returns: 

349 DataFrame 

350 """ 

351 service = self.connect() 

352 args = {key: value for key, value in params.items() if key in ["feedId"] and value is not None} 

353 if params["product_id"]: 

354 result = ( 

355 service.products().delete(merchantId=self.merchant_id, productId=params["product_id"], **args).execute() 

356 ) 

357 return result 

358 else: 

359 df = pd.DataFrame(columns=["productId", "status"]) 

360 if not params["startId"]: 

361 start_id = int(params["endId"]) - 10 

362 elif not params["endId"]: 

363 end_id = int(params["startId"]) + 10 

364 else: 

365 start_id = int(params["startId"]) 

366 end_id = int(params["endId"]) 

367 

368 for i in range(start_id, end_id): 

369 service.products().delete(merchantId=self.merchant_id, productId=i, **args).execute() 

370 df = pd.concat([df, pd.DataFrame([{"productId": str(i), "status": "deleted"}])], ignore_index=True) 

371 return df 

372 

373 def call_application_api(self, method_name: str = None, params: dict = None) -> DataFrame: 

374 """ 

375 Call Google Search API and map the data to pandas DataFrame 

376 Args: 

377 method_name (str): method name 

378 params (dict): query parameters 

379 Returns: 

380 DataFrame 

381 """ 

382 if method_name == "get_accounts": 

383 return self.get_accounts(params) 

384 elif method_name == "delete_accounts": 

385 return self.delete_accounts(params) 

386 elif method_name == "get_orders": 

387 return self.get_orders(params) 

388 elif method_name == "delete_orders": 

389 return self.delete_orders(params) 

390 elif method_name == "get_products": 

391 return self.get_products(params) 

392 elif method_name == "update_products": 

393 return self.update_products(params) 

394 elif method_name == "delete_products": 

395 return self.delete_products(params) 

396 else: 

397 raise NotImplementedError(f"Unknown method {method_name}")