Coverage for mindsdb / integrations / handlers / google_search_handler / google_search_handler.py: 0%
101 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import json
2import pandas as pd
4from pandas import DataFrame
5from google.auth.transport.requests import Request
6from google.oauth2.credentials import Credentials
7from googleapiclient.discovery import build
8from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE
9from .google_search_tables import SearchAnalyticsTable, SiteMapsTable
10from mindsdb.integrations.libs.api_handler import APIHandler, FuncParser
11from mindsdb.integrations.libs.response import (
12 HandlerStatusResponse as StatusResponse,
13 HandlerResponse as Response,
14)
15from mindsdb.utilities import log
17logger = log.getLogger(__name__)
20class GoogleSearchConsoleHandler(APIHandler):
21 """
22 A class for handling connections and interactions with the Google Search Console API.
23 """
25 name = "google_search"
27 def __init__(self, name: str, **kwargs):
28 """
29 Initialize the Google Search Console API handler.
30 Args:
31 name (str): name of the handler
32 kwargs (dict): additional arguments
33 """
34 super().__init__(name)
35 self.token = None
36 self.service = None
37 self.connection_data = kwargs.get("connection_data", {})
38 self.fs_storage = kwargs["file_storage"]
39 self.credentials_file = self.connection_data.get("credentials", None)
40 self.credentials = None
41 self.scopes = [
42 "https://www.googleapis.com/auth/webmasters.readonly",
43 "https://www.googleapis.com/auth/webmasters",
44 ]
45 self.is_connected = False
46 analytics = SearchAnalyticsTable(self)
47 self.analytics = analytics
48 self._register_table("Analytics", analytics)
49 sitemaps = SiteMapsTable(self)
50 self.sitemaps = sitemaps
51 self._register_table("Sitemaps", sitemaps)
53 def connect(self):
54 """
55 Set up any connections required by the handler
56 Should return output of check_connection() method after attempting
57 connection. Should switch self.is_connected.
58 Returns:
59 HandlerStatusResponse
60 """
61 if self.is_connected is True:
62 return self.service
63 if self.credentials_file:
64 try:
65 json_str_bytes = self.fs_storage.file_get("token_search.json")
66 json_str = json_str_bytes.decode()
67 self.credentials = Credentials.from_authorized_user_info(info=json.loads(json_str), scopes=self.scopes)
68 except Exception:
69 self.credentials = None
71 if not self.credentials or not self.credentials.valid:
72 if self.credentials and self.credentials.expired and self.credentials.refresh_token:
73 self.credentials.refresh(Request())
74 else:
75 self.credentials = Credentials.from_authorized_user_file(self.credentials_file, scopes=self.scopes)
76 # Save the credentials for the next run
77 json_str = self.credentials.to_json()
78 self.fs_storage.file_set("token_search.json", json_str.encode())
80 self.service = build("webmasters", "v3", credentials=self.credentials)
81 return self.service
83 def check_connection(self) -> StatusResponse:
84 """
85 Check connection to the handler
86 Returns:
87 HandlerStatusResponse
88 """
89 response = StatusResponse(False)
91 try:
92 self.connect()
93 response.success = True
94 except Exception as e:
95 logger.error(f"Error connecting to Google Search Console API: {e}!")
96 response.error_message = e
98 self.is_connected = response.success
99 return response
101 def native_query(self, query: str = None) -> Response:
102 """
103 Receive raw query and act upon it somehow.
104 Args:
105 query (Any): query in native format (str for sql databases,
106 api's json etc)
107 Returns:
108 HandlerResponse
109 """
110 method_name, params = FuncParser().from_string(query)
112 df = self.call_application_api(method_name, params)
114 return Response(RESPONSE_TYPE.TABLE, data_frame=df)
116 def get_traffic_data(self, params: dict = None) -> DataFrame:
117 """
118 Get traffic data from Google Search Console API
119 Args:
120 params (dict): query parameters
121 Returns:
122 DataFrame
123 """
124 service = self.connect()
125 accepted_params = ["start_date", "end_date", "dimensions", "row_limit", "aggregation_type"]
126 search_analytics_query_request = {
127 key: value for key, value in params.items() if key in accepted_params and value is not None
128 }
129 response = (
130 service.searchanalytics().query(siteUrl=params["siteUrl"], body=search_analytics_query_request).execute()
131 )
132 df = pd.DataFrame(response["rows"], columns=self.analytics.get_columns())
133 return df
135 def get_sitemaps(self, params: dict = None) -> DataFrame:
136 """
137 Get sitemaps data from Google Search Console API
138 Args:
139 params (dict): query parameters
140 Returns:
141 DataFrame
142 """
143 service = self.connect()
144 if params["sitemapIndex"]:
145 response = service.sitemaps().list(siteUrl=params["siteUrl"], sitemapIndex=params["sitemapIndex"]).execute()
146 else:
147 response = service.sitemaps().list(siteUrl=params["siteUrl"]).execute()
148 df = pd.DataFrame(response["sitemap"], columns=self.sitemaps.get_columns())
150 # Get as many sitemaps as indicated by the row_limit parameter
151 if params["row_limit"]:
152 if params["row_limit"] > len(df):
153 row_limit = len(df)
154 else:
155 row_limit = params["row_limit"]
157 df = df[:row_limit]
159 return df
161 def submit_sitemap(self, params: dict = None) -> DataFrame:
162 """
163 Submit sitemap to Google Search Console API
164 Args:
165 params (dict): query parameters
166 Returns:
167 DataFrame
168 """
169 service = self.connect()
170 response = service.sitemaps().submit(siteUrl=params["siteUrl"], feedpath=params["feedpath"]).execute()
171 df = pd.DataFrame(response, columns=self.sitemaps.get_columns())
172 return df
174 def delete_sitemap(self, params: dict = None) -> DataFrame:
175 """
176 Delete sitemap from Google Search Console API
177 Args:
178 params (dict): query parameters
179 Returns:
180 DataFrame
181 """
182 service = self.connect()
183 response = service.sitemaps().delete(siteUrl=params["siteUrl"], feedpath=params["feedpath"]).execute()
184 df = pd.DataFrame(response, columns=self.sitemaps.get_columns())
185 return df
187 def call_application_api(self, method_name: str = None, params: dict = None) -> DataFrame:
188 """
189 Call Google Search Console API and map the data to pandas DataFrame
190 Args:
191 method_name (str): method name
192 params (dict): query parameters
193 Returns:
194 DataFrame
195 """
196 if method_name == "get_traffic_data":
197 return self.get_traffic_data(params)
198 elif method_name == "get_sitemaps":
199 return self.get_sitemaps(params)
200 elif method_name == "submit_sitemap":
201 return self.submit_sitemap(params)
202 elif method_name == "delete_sitemap":
203 return self.delete_sitemap(params)
204 else:
205 raise NotImplementedError(f"Unknown method {method_name}")