Coverage for mindsdb / integrations / handlers / pirateweather_handler / pirateweather_handler.py: 0%

98 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import os 

2from typing import Any 

3 

4import pandas as pd 

5import requests 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb_sql_parser import ast 

8 

9from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE 

10from mindsdb.integrations.utilities.handlers.query_utilities import SELECTQueryParser, SELECTQueryExecutor 

11from mindsdb.integrations.libs.api_handler import APIHandler, APITable 

12from mindsdb.integrations.libs.response import HandlerResponse, HandlerStatusResponse 

13from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions 

14from mindsdb.utilities.config import Config 

15 

16 

17class PirateWeatherAPIBaseTable(APITable): 

18 allowed_select_keys = {} 

19 columns = [] 

20 table_name = "" 

21 

22 def __init__(self, handler: "PirateWeatherAPIHandler"): 

23 super().__init__(handler) 

24 

25 def select(self, query: ast.Select) -> pd.DataFrame: 

26 """Select data from the collected weather data and return it as a pandas DataFrame. 

27 

28 Args: 

29 query (ast.Select): The SQL query to be executed. 

30 

31 Returns: 

32 pandas.DataFrame: A pandas DataFrame containing the selected data. 

33 """ 

34 conditions = extract_comparison_conditions(query.where) 

35 

36 params = {} 

37 for op, arg1, arg2 in conditions: 

38 if arg1 in self.allowed_select_keys: 

39 if op == "=": 

40 params[arg1] = arg2 

41 else: 

42 raise NotImplementedError(f"Operator for argument {arg1} is not supported: {op}") 

43 

44 if "latitude" not in params or "longitude" not in params: 

45 raise ValueError("Latitude and longitude are required") 

46 

47 result = self.handler.call_application_api(method_name=self.table_name, params=params) 

48 

49 # Reparse the query and run through SELECTQueryExecutor 

50 query_parser = SELECTQueryParser(query, table=self.table_name, columns=self.get_columns()) 

51 selected_columns, where_conditions, order_by_conditions, result_limit = query_parser.parse_query() 

52 

53 # Remove request parameters from where conditions 

54 where_conditions = [c for c in where_conditions if c[1] not in self.allowed_select_keys] 

55 

56 query_executor = SELECTQueryExecutor(result, selected_columns, where_conditions, order_by_conditions, 

57 result_limit) 

58 

59 return query_executor.execute_query() 

60 

61 def get_columns(self) -> list: 

62 return self.columns 

63 

64 

65class PiratePirateWeatherAPIHourlyTable(PirateWeatherAPIBaseTable): 

66 allowed_select_keys = { 

67 "latitude", 

68 "longitude", 

69 "time", 

70 "units" 

71 } 

72 columns = [ 

73 "localtime", 

74 "icon", 

75 "summary", 

76 "precipAccumulation", 

77 "precipType", 

78 "temperature", 

79 "apparentTemperature", 

80 "dewPoint", 

81 "pressure", 

82 "windSpeed", 

83 "windBearing", 

84 "cloudCover", 

85 "latitude", 

86 "longitude", 

87 "timezone", 

88 "offset" 

89 ] 

90 table_name = "hourly" 

91 

92 

93class PiratePirateWeatherAPIDailyTable(PirateWeatherAPIBaseTable): 

94 allowed_select_keys = { 

95 "latitude", 

96 "longitude", 

97 "time", 

98 "units" 

99 } 

100 

101 columns = [ 

102 "localtime", 

103 "icon", 

104 "summary", 

105 "sunriseTime", 

106 "sunsetTime", 

107 "moonPhase", 

108 "precipAccumulation", 

109 "precipType", 

110 "temperatureHigh", 

111 "temperatureHighTime", 

112 "temperatureLow", 

113 "temperatureLowTime", 

114 "apparentTemperatureHigh", 

115 "apparentTemperatureHighTime", 

116 "apparentTemperatureLow", 

117 "apparentTemperatureLowTime", 

118 "dewPoint", 

119 "pressure", 

120 "windSpeed", 

121 "windBearing", 

122 "cloudCover", 

123 "temperatureMin", 

124 "temperatureMinTime", 

125 "temperatureMax", 

126 "temperatureMaxTime", 

127 "apparentTemperatureMin", 

128 "apparentTemperatureMinTime", 

129 "apparentTemperatureMax", 

130 "apparentTemperatureMaxTime", 

131 "latitude", 

132 "longitude", 

133 "timezone", 

134 "offset" 

135 ] 

136 table_name = "daily" 

137 

138 

139class PirateWeatherAPIHandler(APIHandler): 

140 query_string_template = "https://timemachine.pirateweather.net/forecast/{api_key}/{latitude},{longitude}" 

141 

142 def __init__(self, name: str, **kwargs): 

143 super().__init__(name) 

144 self._tables = {} 

145 

146 args = kwargs.get("connection_data", {}) 

147 handler_config = Config().get("weather_handler", {}) 

148 

149 connection_args = {} 

150 

151 for k in ["api_key"]: 

152 if k in args: 

153 connection_args[k] = args[k] 

154 elif f"WEATHER_{k.upper()}" in os.environ: 

155 connection_args[k] = os.environ[f"WEATHER_{k.upper()}"] 

156 elif k in handler_config: 

157 connection_args[k] = handler_config[k] 

158 

159 self._api_key = connection_args["api_key"] 

160 

161 # Register tables 

162 self._register_table("hourly", PiratePirateWeatherAPIHourlyTable(self)) 

163 self._register_table("daily", PiratePirateWeatherAPIDailyTable(self)) 

164 

165 def _register_table(self, table_name: str, table_class: Any): 

166 self._tables[table_name] = table_class 

167 

168 def connect(self) -> HandlerStatusResponse: 

169 return HandlerStatusResponse(success=True) 

170 

171 def check_connection(self) -> HandlerStatusResponse: 

172 response = HandlerStatusResponse(False) 

173 

174 try: 

175 self.call_application_api(method_name="daily", params=dict(latitude=51.507351, 

176 longitude=-0.127758, 

177 time="1672578052")) 

178 response.success = True 

179 

180 except Exception as e: 

181 response.error_message = str(e) 

182 

183 return response 

184 

185 def native_query(self, query: Any): 

186 ast = parse_sql(query) 

187 table = str(ast.from_table) 

188 data = self._tables[table].select(ast) 

189 return HandlerResponse(RESPONSE_TYPE.TABLE, data_frame=data) 

190 

191 def call_application_api( 

192 self, method_name: str = None, params: dict = None 

193 ) -> pd.DataFrame: 

194 # This will implement api base on the native query 

195 # By processing native query to convert it to api callable parameters 

196 if method_name not in ["hourly", "daily"]: 

197 raise NotImplementedError(f"Method {method_name} is not implemented") 

198 

199 if "latitude" not in params or "longitude" not in params: 

200 raise ValueError("Latitude and longitude are required") 

201 

202 opt_params = { 

203 "exclude": "currently,minutely,alerts,hourly,daily".replace("," + method_name, ""), 

204 "units": params.get("units"), 

205 } 

206 

207 # Build the query 

208 query = self.query_string_template.format( 

209 api_key=self._api_key, 

210 latitude=params["latitude"], 

211 longitude=params["longitude"] 

212 ) 

213 if "time" in params: 

214 query += f",{params['time']}" 

215 # Add optional parameters 

216 query += "?" + "&".join([f"{k}={v}" for k, v in opt_params.items() if v]) 

217 

218 # Call the API 

219 response = requests.get(query) 

220 response.raise_for_status() 

221 

222 # Parse the response 

223 data = response.json() 

224 if method_name not in data: 

225 raise ValueError(f"API response did not contain {method_name} data. Check your API key. Got response: {data}") 

226 

227 # Convert to dataframe 

228 df = pd.DataFrame(data[method_name]["data"]).assign( 

229 latitude=params["latitude"], 

230 longitude=params["longitude"], 

231 timezone=data["timezone"], 

232 offset=data["offset"] 

233 ) 

234 df["localtime"] = pd.to_datetime(df["time"], utc=True, unit="s").dt.tz_convert(data["timezone"]) 

235 df.drop(columns="time", inplace=True) 

236 return df