Coverage for mindsdb / integrations / handlers / google_content_shopping_handler / google_content_shopping_handler.py: 0%
231 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import json
3import pandas as pd
4from pandas import DataFrame
5from google.auth.transport.requests import Request
6from google.oauth2 import service_account
7from google.oauth2.credentials import Credentials
8from googleapiclient.discovery import build
9from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE
10from .google_content_shopping_tables import AccountsTable, OrdersTable, ProductsTable
11from mindsdb.integrations.libs.api_handler import APIHandler, FuncParser
12from mindsdb.integrations.libs.response import (
13 HandlerStatusResponse as StatusResponse,
14 HandlerResponse as Response,
15)
16from mindsdb.utilities import log
18logger = log.getLogger(__name__)
21class GoogleContentShoppingHandler(APIHandler):
22 """
23 A class for handling connections and interactions with the Google Content API for Shopping.
24 """
26 name = "google_content_shopping"
28 def __init__(self, name: str, **kwargs):
29 """
30 Initialize the Google Content API for Shopping handler.
31 Args:
32 name (str): name of the handler
33 kwargs (dict): additional arguments
34 """
35 super().__init__(name)
36 self.token = None
37 self.service = None
38 self.connection_data = kwargs.get("connection_data", {})
39 self.fs_storage = kwargs["file_storage"]
40 self.credentials_file = self.connection_data.get("credentials", None)
41 self.merchant_id = self.connection_data.get("merchant_id", None)
42 self.credentials = None
43 self.scopes = ["https://www.googleapis.com/auth/content"]
44 self.is_connected = False
45 accounts = AccountsTable(self)
46 self.accounts = accounts
47 self._register_table("Accounts", accounts)
48 orders = OrdersTable(self)
49 self.orders = orders
50 self._register_table("Orders", orders)
51 products = ProductsTable(self)
52 self.products = products
53 self._register_table("Products", products)
55 def connect(self):
56 """
57 Set up any connections required by the handler
58 Should return output of check_connection() method after attempting
59 connection. Should switch self.is_connected.
60 Returns:
61 HandlerStatusResponse
62 """
63 if self.is_connected is True:
64 return self.service
65 if self.credentials_file:
66 try:
67 json_str_bytes = self.fs_storage.file_get("token_content.json")
68 json_str = json_str_bytes.decode()
69 self.credentials = Credentials.from_authorized_user_info(info=json.loads(json_str), scopes=self.scopes)
70 except Exception:
71 self.credentials = None
72 if not self.credentials or not self.credentials.valid:
73 if self.credentials and self.credentials.expired and self.credentials.refresh_token:
74 self.credentials.refresh(Request())
75 else:
76 self.credentials = service_account.Credentials.from_service_account_file(
77 self.credentials_file, scopes=self.scopes
78 )
79 # Save the credentials for the next run
80 json_str = self.credentials.to_json()
81 self.fs_storage.file_set("token_content.json", json_str.encode())
82 self.service = build("content", "v2.1", credentials=self.credentials)
83 return self.service
85 def check_connection(self) -> StatusResponse:
86 """
87 Check connection to the handler
88 Returns:
89 HandlerStatusResponse
90 """
91 response = StatusResponse(False)
93 try:
94 self.connect()
95 response.success = True
96 except Exception as e:
97 logger.error(f"Error connecting to Google Content API for Shopping: {e}!")
98 response.error_message = e
100 self.is_connected = response.success
101 return response
103 def native_query(self, query: str = None) -> Response:
104 """
105 Receive raw query and act upon it somehow.
106 Args:
107 query (Any): query in native format (str for sql databases,
108 api's json etc)
109 Returns:
110 HandlerResponse
111 """
112 method_name, params = FuncParser().from_string(query)
114 df = self.call_application_api(method_name, params)
116 return Response(RESPONSE_TYPE.TABLE, data_frame=df)
118 def get_accounts(self, params: dict = None) -> DataFrame:
119 """
120 Get accounts
121 Args:
122 params (dict): query parameters
123 Returns:
124 DataFrame
125 """
126 service = self.connect()
127 page_token = None
128 accounts = pd.DataFrame(columns=self.accounts.get_columns())
129 if params["account_id"]:
130 result = service.accounts().get(merchantId=self.merchant_id, accountId=params["account_id"]).execute()
131 accounts = pd.DataFrame(result, columns=self.accounts.get_columns())
132 return accounts
133 while True:
134 result = service.accounts().list(merchantId=self.merchant_id, page_token=page_token, **params).execute()
135 accounts = pd.concat(
136 [accounts, pd.DataFrame(result["resources"], columns=self.accounts.get_columns())], ignore_index=True
137 )
138 page_token = result.get("nextPageToken")
139 if not page_token:
140 break
142 if params["startId"] and params["endId"]:
143 start_id = int(params["startId"])
144 end_id = int(params["endId"])
145 elif params["startId"]:
146 start_id = int(params["startId"])
147 end_id = start_id + 10
148 elif params["endId"]:
149 end_id = int(params["endId"])
150 start_id = end_id - 10
151 else:
152 raise Exception("startId or endId must be specified")
154 accounts = accounts.drop(accounts[(accounts["id"] < start_id) | (accounts["id"] > end_id)].index)
156 return accounts
158 def delete_accounts(self, params: dict = None) -> DataFrame:
159 """
160 Delete accounts
161 Args:
162 params (dict): query parameters
163 Returns:
164 DataFrame
165 """
166 service = self.connect()
167 args = {}
168 if params["force"]:
169 args = {"force": params["force"]}
170 if params["accountId"]:
171 result = (
172 service.accounts().delete(merchantId=self.merchant_id, accountId=params["accountId"], **args).execute()
173 )
174 return result
175 else:
176 df = pd.DataFrame(columns=["accountId", "status"])
177 if not params["startId"]:
178 start_id = int(params["endId"]) - 10
179 elif not params["endId"]:
180 end_id = int(params["startId"]) + 10
181 else:
182 start_id = int(params["startId"])
183 end_id = int(params["endId"])
185 for i in range(start_id, end_id):
186 service.accounts().delete(merchantId=self.merchant_id, accountId=i, **args).execute()
187 df = pd.concat([df, pd.DataFrame([{"accountId": str(i), "status": "deleted"}])], ignore_index=True)
188 return df
190 def get_orders(self, params: dict = None) -> DataFrame:
191 """
192 Get orders
193 Args:
194 params (dict): query parameters
195 Returns:
196 DataFrame
197 """
198 service = self.connect()
199 page_token = None
200 orders = pd.DataFrame(columns=self.orders.get_columns())
201 args = {
202 key: value
203 for key, value in params.items()
204 if key in ["maxResults", "statuses", "acknowledged", "placedDateStart", "placedDateEnd", "orderBy"]
205 and value is not None
206 }
207 if params["order_id"]:
208 result = service.orders().get(merchantId=self.merchant_id, orderId=params["order_id"], **args).execute()
209 orders = pd.DataFrame(result, columns=self.orders.get_columns())
210 return orders
211 while True:
212 result = service.orders().list(merchantId=self.merchant_id, page_token=page_token, **args).execute()
213 orders = pd.concat(
214 [orders, pd.DataFrame(result["resources"], columns=self.orders.get_columns())], ignore_index=True
215 )
216 page_token = result.get("nextPageToken")
217 if not page_token:
218 break
220 if params["startId"] and params["endId"]:
221 start_id = int(params["startId"])
222 end_id = int(params["endId"])
223 elif params["startId"]:
224 start_id = int(params["startId"])
225 end_id = start_id + 10
226 elif params["endId"]:
227 end_id = int(params["endId"])
228 start_id = end_id - 10
229 else:
230 raise Exception("startId or endId must be specified")
232 orders = orders.drop(orders[(orders["id"] < start_id) | (orders["id"] > end_id)].index)
234 return orders
236 def delete_orders(self, params: dict = None) -> DataFrame:
237 """
238 Delete orders
239 Args:
240 params (dict): query parameters
241 Returns:
242 DataFrame
243 """
244 service = self.connect()
245 if params["order_id"]:
246 result = service.orders().delete(merchantId=self.merchant_id, orderId=params["order_id"]).execute()
247 return result
248 else:
249 df = pd.DataFrame(columns=["orderId", "status"])
250 if not params["startId"]:
251 start_id = int(params["endId"]) - 10
252 elif not params["endId"]:
253 end_id = int(params["startId"]) + 10
254 else:
255 start_id = int(params["startId"])
256 end_id = int(params["endId"])
258 for i in range(start_id, end_id):
259 service.orders().delete(merchantId=self.merchant_id, orderId=i).execute()
260 df = pd.concat([df, pd.DataFrame([{"orderId": str(i), "status": "deleted"}])], ignore_index=True)
261 return df
263 def get_products(self, params: dict = None) -> DataFrame:
264 """
265 Get products
266 Args:
267 params (dict): query parameters
268 Returns:
269 DataFrame
270 """
271 service = self.connect()
272 page_token = None
273 products = pd.DataFrame(columns=self.products.get_columns())
274 if params["product_id"]:
275 result = service.products().get(merchantId=self.merchant_id, productId=params["product_id"]).execute()
276 products = pd.DataFrame(result, columns=self.products.get_columns())
277 return products
278 while True:
279 result = service.products().list(merchantId=self.merchant_id, page_token=page_token).execute()
280 products = pd.concat(
281 [products, pd.DataFrame(result["resources"], columns=self.products.get_columns())], ignore_index=True
282 )
283 page_token = result.get("nextPageToken")
284 if not page_token:
285 break
287 if params["startId"] and params["endId"]:
288 start_id = int(params["startId"])
289 end_id = int(params["endId"])
290 elif params["startId"]:
291 start_id = int(params["startId"])
292 end_id = start_id + 10
293 elif params["endId"]:
294 end_id = int(params["endId"])
295 start_id = end_id - 10
296 else:
297 raise Exception("startId or endId must be specified")
299 products = products.drop(products[(products["id"] < start_id) | (products["id"] > end_id)].index)
301 return products
303 def update_products(self, params: dict = None) -> DataFrame:
304 """
305 Update products
306 Args:
307 params (dict): query parameters
308 Returns:
309 DataFrame
310 """
311 body = {key: value for key, value in params.items() if key in self.products.get_columns()}
312 service = self.connect()
313 if params["product_id"]:
314 result = (
315 service.products()
316 .update(
317 merchantId=self.merchant_id,
318 productId=params["product_id"],
319 updateMask=params["updateMask"],
320 body=body,
321 )
322 .execute()
323 )
325 return result
326 else:
327 df = pd.DataFrame(columns=["productId", "status"])
328 if not params["startId"]:
329 start_id = int(params["endId"]) - 10
330 elif not params["endId"]:
331 end_id = int(params["startId"]) + 10
332 else:
333 start_id = int(params["startId"])
334 end_id = int(params["endId"])
336 for i in range(start_id, end_id):
337 service.products().update(
338 merchantId=self.merchant_id, productId=i, updateMask=params["updateMask"], body=body
339 ).execute()
340 df = pd.concat([df, pd.DataFrame([{"productId": str(i), "status": "updated"}])], ignore_index=True)
341 return df
343 def delete_products(self, params: dict = None) -> DataFrame:
344 """
345 Delete products
346 Args:
347 params (dict): query parameters
348 Returns:
349 DataFrame
350 """
351 service = self.connect()
352 args = {key: value for key, value in params.items() if key in ["feedId"] and value is not None}
353 if params["product_id"]:
354 result = (
355 service.products().delete(merchantId=self.merchant_id, productId=params["product_id"], **args).execute()
356 )
357 return result
358 else:
359 df = pd.DataFrame(columns=["productId", "status"])
360 if not params["startId"]:
361 start_id = int(params["endId"]) - 10
362 elif not params["endId"]:
363 end_id = int(params["startId"]) + 10
364 else:
365 start_id = int(params["startId"])
366 end_id = int(params["endId"])
368 for i in range(start_id, end_id):
369 service.products().delete(merchantId=self.merchant_id, productId=i, **args).execute()
370 df = pd.concat([df, pd.DataFrame([{"productId": str(i), "status": "deleted"}])], ignore_index=True)
371 return df
373 def call_application_api(self, method_name: str = None, params: dict = None) -> DataFrame:
374 """
375 Call Google Search API and map the data to pandas DataFrame
376 Args:
377 method_name (str): method name
378 params (dict): query parameters
379 Returns:
380 DataFrame
381 """
382 if method_name == "get_accounts":
383 return self.get_accounts(params)
384 elif method_name == "delete_accounts":
385 return self.delete_accounts(params)
386 elif method_name == "get_orders":
387 return self.get_orders(params)
388 elif method_name == "delete_orders":
389 return self.delete_orders(params)
390 elif method_name == "get_products":
391 return self.get_products(params)
392 elif method_name == "update_products":
393 return self.update_products(params)
394 elif method_name == "delete_products":
395 return self.delete_products(params)
396 else:
397 raise NotImplementedError(f"Unknown method {method_name}")