Coverage for mindsdb / integrations / handlers / webz_handler / webz_handler.py: 0%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2import time 

3from typing import Any, Dict 

4 

5import pandas as pd 

6import webzio 

7from dotty_dict import dotty 

8from mindsdb_sql_parser import parse_sql 

9 

10from mindsdb.integrations.handlers.webz_handler.webz_tables import ( 

11 WebzPostsTable, 

12 WebzReviewsTable, 

13) 

14from mindsdb.integrations.libs.api_handler import APIHandler 

15from mindsdb.integrations.libs.response import HandlerResponse as Response 

16from mindsdb.integrations.libs.response import HandlerStatusResponse as StatusResponse 

17from mindsdb.utilities import log 

18from mindsdb.utilities.config import Config 

19 

20logger = log.getLogger(__name__) 

21 

22 

23class WebzHandler(APIHandler): 

24 """A class for handling connections and interactions with the Webz API.""" 

25 

26 API_CALL_EXEC_LIMIT_SECONDS = 60 

27 AVAILABLE_CONNECTION_ARGUMENTS = ["token"] 

28 

29 def __init__(self, name: str = None, **kwargs): 

30 """Registers all tables and prepares the handler for an API connection. 

31 

32 Args: 

33 name: (str): The handler name to use 

34 """ 

35 super().__init__(name) 

36 

37 args = kwargs.get("connection_data", {}) 

38 self.connection_args = self._read_connection_args(name, **args) 

39 

40 self.client = None 

41 self.is_connected = False 

42 self.max_page_size = 100 

43 

44 self._register_table(WebzPostsTable.TABLE_NAME, WebzPostsTable(self)) 

45 self._register_table(WebzReviewsTable.TABLE_NAME, WebzReviewsTable(self)) 

46 

47 def _read_connection_args(self, name: str = None, **kwargs) -> Dict[str, Any]: 

48 """Read the connection arguments by following the order of precedence below: 

49 

50 1. PARAMETERS object 

51 2. Environment Variables 

52 3. MindsDB Config File 

53 

54 """ 

55 filtered_args = {} 

56 handler_config = Config().get(f"{name.lower()}_handler", {}) 

57 for k in type(self).AVAILABLE_CONNECTION_ARGUMENTS: 

58 if k in kwargs: 

59 filtered_args[k] = kwargs[k] 

60 elif f"{name.upper()}_{k.upper()}" in os.environ: 

61 filtered_args[k] = os.environ[f"{name.upper()}_{k.upper()}"] 

62 elif k in handler_config: 

63 filtered_args[k] = handler_config[k] 

64 return filtered_args 

65 

66 def connect(self) -> object: 

67 """Set up any connections required by the handler 

68 Should return output of check_connection() method after attempting 

69 connection. Should switch self.is_connected. 

70 Returns: 

71 HandlerStatusResponse 

72 """ 

73 if self.is_connected and self.client is not None: 

74 return self.client 

75 

76 webzio.config(token=self.connection_args["token"]) 

77 self.client = webzio 

78 self.is_connected = True 

79 return self.client 

80 

81 def check_connection(self) -> StatusResponse: 

82 """Check connection to the handler 

83 Returns: 

84 HandlerStatusResponse 

85 """ 

86 response = StatusResponse(False) 

87 try: 

88 webzio_client = self.connect() 

89 webzio_client.query("filterWebContent", {"q": "AI", "size": 1}) 

90 response.success = True 

91 except Exception as e: 

92 response.error_message = f"Error connecting to Webz api: {e}." 

93 

94 if response.success is False and self.is_connected is True: 

95 self.is_connected = False 

96 

97 return response 

98 

99 def native_query(self, query: str = None) -> Response: 

100 """Receive raw query and act upon it somehow. 

101 Args: 

102 query (Any): query in native format (str for sql databases, 

103 dict for mongo, api's json etc) 

104 Returns: 

105 HandlerResponse 

106 """ 

107 ast = parse_sql(query) 

108 return self.query(ast) 

109 

110 def _parse_item(self, item, output_colums): 

111 dotted_item = dotty(item) 

112 return {field.replace(".", "__"): dotted_item[field] for field in output_colums} 

113 

114 def call_webz_api( 

115 self, method_name: str = None, params: Dict = None 

116 ) -> pd.DataFrame: 

117 """Calls the API method with the given params. 

118 

119 Returns results as a pandas DataFrame. 

120 

121 Args: 

122 method_name (str): Method name to call 

123 params (Dict): Params to pass to the API call 

124 """ 

125 table_name = method_name 

126 table = self._tables[table_name] 

127 

128 client = self.connect() 

129 

130 left = None 

131 count_results = None 

132 

133 data = [] 

134 limit_exec_time = time.time() + type(self).API_CALL_EXEC_LIMIT_SECONDS 

135 

136 if "size" in params: 

137 count_results = params["size"] 

138 

139 # GET param q is mandatory, so in order to collect all data, 

140 # it's needed to use as a query an asterisk (*) 

141 if "q" not in params: 

142 params["q"] = "*" 

143 

144 while True: 

145 if time.time() > limit_exec_time: 

146 raise RuntimeError("Handler request timeout error") 

147 

148 if count_results is not None: 

149 left = count_results - len(data) 

150 if left == 0: 

151 break 

152 elif left < 0: 

153 # got more results that we need 

154 data = data[:left] 

155 break 

156 

157 if left > self.max_page_size: 

158 params["size"] = self.max_page_size 

159 else: 

160 params["size"] = left 

161 

162 logger.debug( 

163 f"Calling Webz API: {table.ENDPOINT} with params ({params})" 

164 ) 

165 

166 output = ( 

167 client.query(table.ENDPOINT, params) 

168 if len(data) == 0 

169 else client.get_next() 

170 ) 

171 for item in output.get(table_name, []): 

172 data.append(self._parse_item(item, table.OUTPUT_COLUMNS)) 

173 

174 df = pd.DataFrame(data) 

175 return df