Coverage for mindsdb / integrations / handlers / google_search_handler / google_search_handler.py: 0%

101 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import json 

2import pandas as pd 

3 

4from pandas import DataFrame 

5from google.auth.transport.requests import Request 

6from google.oauth2.credentials import Credentials 

7from googleapiclient.discovery import build 

8from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE 

9from .google_search_tables import SearchAnalyticsTable, SiteMapsTable 

10from mindsdb.integrations.libs.api_handler import APIHandler, FuncParser 

11from mindsdb.integrations.libs.response import ( 

12 HandlerStatusResponse as StatusResponse, 

13 HandlerResponse as Response, 

14) 

15from mindsdb.utilities import log 

16 

17logger = log.getLogger(__name__) 

18 

19 

20class GoogleSearchConsoleHandler(APIHandler): 

21 """ 

22 A class for handling connections and interactions with the Google Search Console API. 

23 """ 

24 

25 name = "google_search" 

26 

27 def __init__(self, name: str, **kwargs): 

28 """ 

29 Initialize the Google Search Console API handler. 

30 Args: 

31 name (str): name of the handler 

32 kwargs (dict): additional arguments 

33 """ 

34 super().__init__(name) 

35 self.token = None 

36 self.service = None 

37 self.connection_data = kwargs.get("connection_data", {}) 

38 self.fs_storage = kwargs["file_storage"] 

39 self.credentials_file = self.connection_data.get("credentials", None) 

40 self.credentials = None 

41 self.scopes = [ 

42 "https://www.googleapis.com/auth/webmasters.readonly", 

43 "https://www.googleapis.com/auth/webmasters", 

44 ] 

45 self.is_connected = False 

46 analytics = SearchAnalyticsTable(self) 

47 self.analytics = analytics 

48 self._register_table("Analytics", analytics) 

49 sitemaps = SiteMapsTable(self) 

50 self.sitemaps = sitemaps 

51 self._register_table("Sitemaps", sitemaps) 

52 

53 def connect(self): 

54 """ 

55 Set up any connections required by the handler 

56 Should return output of check_connection() method after attempting 

57 connection. Should switch self.is_connected. 

58 Returns: 

59 HandlerStatusResponse 

60 """ 

61 if self.is_connected is True: 

62 return self.service 

63 if self.credentials_file: 

64 try: 

65 json_str_bytes = self.fs_storage.file_get("token_search.json") 

66 json_str = json_str_bytes.decode() 

67 self.credentials = Credentials.from_authorized_user_info(info=json.loads(json_str), scopes=self.scopes) 

68 except Exception: 

69 self.credentials = None 

70 

71 if not self.credentials or not self.credentials.valid: 

72 if self.credentials and self.credentials.expired and self.credentials.refresh_token: 

73 self.credentials.refresh(Request()) 

74 else: 

75 self.credentials = Credentials.from_authorized_user_file(self.credentials_file, scopes=self.scopes) 

76 # Save the credentials for the next run 

77 json_str = self.credentials.to_json() 

78 self.fs_storage.file_set("token_search.json", json_str.encode()) 

79 

80 self.service = build("webmasters", "v3", credentials=self.credentials) 

81 return self.service 

82 

83 def check_connection(self) -> StatusResponse: 

84 """ 

85 Check connection to the handler 

86 Returns: 

87 HandlerStatusResponse 

88 """ 

89 response = StatusResponse(False) 

90 

91 try: 

92 self.connect() 

93 response.success = True 

94 except Exception as e: 

95 logger.error(f"Error connecting to Google Search Console API: {e}!") 

96 response.error_message = e 

97 

98 self.is_connected = response.success 

99 return response 

100 

101 def native_query(self, query: str = None) -> Response: 

102 """ 

103 Receive raw query and act upon it somehow. 

104 Args: 

105 query (Any): query in native format (str for sql databases, 

106 api's json etc) 

107 Returns: 

108 HandlerResponse 

109 """ 

110 method_name, params = FuncParser().from_string(query) 

111 

112 df = self.call_application_api(method_name, params) 

113 

114 return Response(RESPONSE_TYPE.TABLE, data_frame=df) 

115 

116 def get_traffic_data(self, params: dict = None) -> DataFrame: 

117 """ 

118 Get traffic data from Google Search Console API 

119 Args: 

120 params (dict): query parameters 

121 Returns: 

122 DataFrame 

123 """ 

124 service = self.connect() 

125 accepted_params = ["start_date", "end_date", "dimensions", "row_limit", "aggregation_type"] 

126 search_analytics_query_request = { 

127 key: value for key, value in params.items() if key in accepted_params and value is not None 

128 } 

129 response = ( 

130 service.searchanalytics().query(siteUrl=params["siteUrl"], body=search_analytics_query_request).execute() 

131 ) 

132 df = pd.DataFrame(response["rows"], columns=self.analytics.get_columns()) 

133 return df 

134 

135 def get_sitemaps(self, params: dict = None) -> DataFrame: 

136 """ 

137 Get sitemaps data from Google Search Console API 

138 Args: 

139 params (dict): query parameters 

140 Returns: 

141 DataFrame 

142 """ 

143 service = self.connect() 

144 if params["sitemapIndex"]: 

145 response = service.sitemaps().list(siteUrl=params["siteUrl"], sitemapIndex=params["sitemapIndex"]).execute() 

146 else: 

147 response = service.sitemaps().list(siteUrl=params["siteUrl"]).execute() 

148 df = pd.DataFrame(response["sitemap"], columns=self.sitemaps.get_columns()) 

149 

150 # Get as many sitemaps as indicated by the row_limit parameter 

151 if params["row_limit"]: 

152 if params["row_limit"] > len(df): 

153 row_limit = len(df) 

154 else: 

155 row_limit = params["row_limit"] 

156 

157 df = df[:row_limit] 

158 

159 return df 

160 

161 def submit_sitemap(self, params: dict = None) -> DataFrame: 

162 """ 

163 Submit sitemap to Google Search Console API 

164 Args: 

165 params (dict): query parameters 

166 Returns: 

167 DataFrame 

168 """ 

169 service = self.connect() 

170 response = service.sitemaps().submit(siteUrl=params["siteUrl"], feedpath=params["feedpath"]).execute() 

171 df = pd.DataFrame(response, columns=self.sitemaps.get_columns()) 

172 return df 

173 

174 def delete_sitemap(self, params: dict = None) -> DataFrame: 

175 """ 

176 Delete sitemap from Google Search Console API 

177 Args: 

178 params (dict): query parameters 

179 Returns: 

180 DataFrame 

181 """ 

182 service = self.connect() 

183 response = service.sitemaps().delete(siteUrl=params["siteUrl"], feedpath=params["feedpath"]).execute() 

184 df = pd.DataFrame(response, columns=self.sitemaps.get_columns()) 

185 return df 

186 

187 def call_application_api(self, method_name: str = None, params: dict = None) -> DataFrame: 

188 """ 

189 Call Google Search Console API and map the data to pandas DataFrame 

190 Args: 

191 method_name (str): method name 

192 params (dict): query parameters 

193 Returns: 

194 DataFrame 

195 """ 

196 if method_name == "get_traffic_data": 

197 return self.get_traffic_data(params) 

198 elif method_name == "get_sitemaps": 

199 return self.get_sitemaps(params) 

200 elif method_name == "submit_sitemap": 

201 return self.submit_sitemap(params) 

202 elif method_name == "delete_sitemap": 

203 return self.delete_sitemap(params) 

204 else: 

205 raise NotImplementedError(f"Unknown method {method_name}")