Coverage for mindsdb / integrations / handlers / webz_handler / webz_handler.py: 0%
92 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os
2import time
3from typing import Any, Dict
5import pandas as pd
6import webzio
7from dotty_dict import dotty
8from mindsdb_sql_parser import parse_sql
10from mindsdb.integrations.handlers.webz_handler.webz_tables import (
11 WebzPostsTable,
12 WebzReviewsTable,
13)
14from mindsdb.integrations.libs.api_handler import APIHandler
15from mindsdb.integrations.libs.response import HandlerResponse as Response
16from mindsdb.integrations.libs.response import HandlerStatusResponse as StatusResponse
17from mindsdb.utilities import log
18from mindsdb.utilities.config import Config
20logger = log.getLogger(__name__)
23class WebzHandler(APIHandler):
24 """A class for handling connections and interactions with the Webz API."""
26 API_CALL_EXEC_LIMIT_SECONDS = 60
27 AVAILABLE_CONNECTION_ARGUMENTS = ["token"]
29 def __init__(self, name: str = None, **kwargs):
30 """Registers all tables and prepares the handler for an API connection.
32 Args:
33 name: (str): The handler name to use
34 """
35 super().__init__(name)
37 args = kwargs.get("connection_data", {})
38 self.connection_args = self._read_connection_args(name, **args)
40 self.client = None
41 self.is_connected = False
42 self.max_page_size = 100
44 self._register_table(WebzPostsTable.TABLE_NAME, WebzPostsTable(self))
45 self._register_table(WebzReviewsTable.TABLE_NAME, WebzReviewsTable(self))
47 def _read_connection_args(self, name: str = None, **kwargs) -> Dict[str, Any]:
48 """Read the connection arguments by following the order of precedence below:
50 1. PARAMETERS object
51 2. Environment Variables
52 3. MindsDB Config File
54 """
55 filtered_args = {}
56 handler_config = Config().get(f"{name.lower()}_handler", {})
57 for k in type(self).AVAILABLE_CONNECTION_ARGUMENTS:
58 if k in kwargs:
59 filtered_args[k] = kwargs[k]
60 elif f"{name.upper()}_{k.upper()}" in os.environ:
61 filtered_args[k] = os.environ[f"{name.upper()}_{k.upper()}"]
62 elif k in handler_config:
63 filtered_args[k] = handler_config[k]
64 return filtered_args
66 def connect(self) -> object:
67 """Set up any connections required by the handler
68 Should return output of check_connection() method after attempting
69 connection. Should switch self.is_connected.
70 Returns:
71 HandlerStatusResponse
72 """
73 if self.is_connected and self.client is not None:
74 return self.client
76 webzio.config(token=self.connection_args["token"])
77 self.client = webzio
78 self.is_connected = True
79 return self.client
81 def check_connection(self) -> StatusResponse:
82 """Check connection to the handler
83 Returns:
84 HandlerStatusResponse
85 """
86 response = StatusResponse(False)
87 try:
88 webzio_client = self.connect()
89 webzio_client.query("filterWebContent", {"q": "AI", "size": 1})
90 response.success = True
91 except Exception as e:
92 response.error_message = f"Error connecting to Webz api: {e}."
94 if response.success is False and self.is_connected is True:
95 self.is_connected = False
97 return response
99 def native_query(self, query: str = None) -> Response:
100 """Receive raw query and act upon it somehow.
101 Args:
102 query (Any): query in native format (str for sql databases,
103 dict for mongo, api's json etc)
104 Returns:
105 HandlerResponse
106 """
107 ast = parse_sql(query)
108 return self.query(ast)
110 def _parse_item(self, item, output_colums):
111 dotted_item = dotty(item)
112 return {field.replace(".", "__"): dotted_item[field] for field in output_colums}
114 def call_webz_api(
115 self, method_name: str = None, params: Dict = None
116 ) -> pd.DataFrame:
117 """Calls the API method with the given params.
119 Returns results as a pandas DataFrame.
121 Args:
122 method_name (str): Method name to call
123 params (Dict): Params to pass to the API call
124 """
125 table_name = method_name
126 table = self._tables[table_name]
128 client = self.connect()
130 left = None
131 count_results = None
133 data = []
134 limit_exec_time = time.time() + type(self).API_CALL_EXEC_LIMIT_SECONDS
136 if "size" in params:
137 count_results = params["size"]
139 # GET param q is mandatory, so in order to collect all data,
140 # it's needed to use as a query an asterisk (*)
141 if "q" not in params:
142 params["q"] = "*"
144 while True:
145 if time.time() > limit_exec_time:
146 raise RuntimeError("Handler request timeout error")
148 if count_results is not None:
149 left = count_results - len(data)
150 if left == 0:
151 break
152 elif left < 0:
153 # got more results that we need
154 data = data[:left]
155 break
157 if left > self.max_page_size:
158 params["size"] = self.max_page_size
159 else:
160 params["size"] = left
162 logger.debug(
163 f"Calling Webz API: {table.ENDPOINT} with params ({params})"
164 )
166 output = (
167 client.query(table.ENDPOINT, params)
168 if len(data) == 0
169 else client.get_next()
170 )
171 for item in output.get(table_name, []):
172 data.append(self._parse_item(item, table.OUTPUT_COLUMNS))
174 df = pd.DataFrame(data)
175 return df