Coverage for mindsdb / integrations / handlers / pirateweather_handler / pirateweather_handler.py: 0%
98 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import os
2from typing import Any
4import pandas as pd
5import requests
6from mindsdb_sql_parser import parse_sql
7from mindsdb_sql_parser import ast
9from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE
10from mindsdb.integrations.utilities.handlers.query_utilities import SELECTQueryParser, SELECTQueryExecutor
11from mindsdb.integrations.libs.api_handler import APIHandler, APITable
12from mindsdb.integrations.libs.response import HandlerResponse, HandlerStatusResponse
13from mindsdb.integrations.utilities.sql_utils import extract_comparison_conditions
14from mindsdb.utilities.config import Config
17class PirateWeatherAPIBaseTable(APITable):
18 allowed_select_keys = {}
19 columns = []
20 table_name = ""
22 def __init__(self, handler: "PirateWeatherAPIHandler"):
23 super().__init__(handler)
25 def select(self, query: ast.Select) -> pd.DataFrame:
26 """Select data from the collected weather data and return it as a pandas DataFrame.
28 Args:
29 query (ast.Select): The SQL query to be executed.
31 Returns:
32 pandas.DataFrame: A pandas DataFrame containing the selected data.
33 """
34 conditions = extract_comparison_conditions(query.where)
36 params = {}
37 for op, arg1, arg2 in conditions:
38 if arg1 in self.allowed_select_keys:
39 if op == "=":
40 params[arg1] = arg2
41 else:
42 raise NotImplementedError(f"Operator for argument {arg1} is not supported: {op}")
44 if "latitude" not in params or "longitude" not in params:
45 raise ValueError("Latitude and longitude are required")
47 result = self.handler.call_application_api(method_name=self.table_name, params=params)
49 # Reparse the query and run through SELECTQueryExecutor
50 query_parser = SELECTQueryParser(query, table=self.table_name, columns=self.get_columns())
51 selected_columns, where_conditions, order_by_conditions, result_limit = query_parser.parse_query()
53 # Remove request parameters from where conditions
54 where_conditions = [c for c in where_conditions if c[1] not in self.allowed_select_keys]
56 query_executor = SELECTQueryExecutor(result, selected_columns, where_conditions, order_by_conditions,
57 result_limit)
59 return query_executor.execute_query()
61 def get_columns(self) -> list:
62 return self.columns
65class PiratePirateWeatherAPIHourlyTable(PirateWeatherAPIBaseTable):
66 allowed_select_keys = {
67 "latitude",
68 "longitude",
69 "time",
70 "units"
71 }
72 columns = [
73 "localtime",
74 "icon",
75 "summary",
76 "precipAccumulation",
77 "precipType",
78 "temperature",
79 "apparentTemperature",
80 "dewPoint",
81 "pressure",
82 "windSpeed",
83 "windBearing",
84 "cloudCover",
85 "latitude",
86 "longitude",
87 "timezone",
88 "offset"
89 ]
90 table_name = "hourly"
93class PiratePirateWeatherAPIDailyTable(PirateWeatherAPIBaseTable):
94 allowed_select_keys = {
95 "latitude",
96 "longitude",
97 "time",
98 "units"
99 }
101 columns = [
102 "localtime",
103 "icon",
104 "summary",
105 "sunriseTime",
106 "sunsetTime",
107 "moonPhase",
108 "precipAccumulation",
109 "precipType",
110 "temperatureHigh",
111 "temperatureHighTime",
112 "temperatureLow",
113 "temperatureLowTime",
114 "apparentTemperatureHigh",
115 "apparentTemperatureHighTime",
116 "apparentTemperatureLow",
117 "apparentTemperatureLowTime",
118 "dewPoint",
119 "pressure",
120 "windSpeed",
121 "windBearing",
122 "cloudCover",
123 "temperatureMin",
124 "temperatureMinTime",
125 "temperatureMax",
126 "temperatureMaxTime",
127 "apparentTemperatureMin",
128 "apparentTemperatureMinTime",
129 "apparentTemperatureMax",
130 "apparentTemperatureMaxTime",
131 "latitude",
132 "longitude",
133 "timezone",
134 "offset"
135 ]
136 table_name = "daily"
139class PirateWeatherAPIHandler(APIHandler):
140 query_string_template = "https://timemachine.pirateweather.net/forecast/{api_key}/{latitude},{longitude}"
142 def __init__(self, name: str, **kwargs):
143 super().__init__(name)
144 self._tables = {}
146 args = kwargs.get("connection_data", {})
147 handler_config = Config().get("weather_handler", {})
149 connection_args = {}
151 for k in ["api_key"]:
152 if k in args:
153 connection_args[k] = args[k]
154 elif f"WEATHER_{k.upper()}" in os.environ:
155 connection_args[k] = os.environ[f"WEATHER_{k.upper()}"]
156 elif k in handler_config:
157 connection_args[k] = handler_config[k]
159 self._api_key = connection_args["api_key"]
161 # Register tables
162 self._register_table("hourly", PiratePirateWeatherAPIHourlyTable(self))
163 self._register_table("daily", PiratePirateWeatherAPIDailyTable(self))
165 def _register_table(self, table_name: str, table_class: Any):
166 self._tables[table_name] = table_class
168 def connect(self) -> HandlerStatusResponse:
169 return HandlerStatusResponse(success=True)
171 def check_connection(self) -> HandlerStatusResponse:
172 response = HandlerStatusResponse(False)
174 try:
175 self.call_application_api(method_name="daily", params=dict(latitude=51.507351,
176 longitude=-0.127758,
177 time="1672578052"))
178 response.success = True
180 except Exception as e:
181 response.error_message = str(e)
183 return response
185 def native_query(self, query: Any):
186 ast = parse_sql(query)
187 table = str(ast.from_table)
188 data = self._tables[table].select(ast)
189 return HandlerResponse(RESPONSE_TYPE.TABLE, data_frame=data)
191 def call_application_api(
192 self, method_name: str = None, params: dict = None
193 ) -> pd.DataFrame:
194 # This will implement api base on the native query
195 # By processing native query to convert it to api callable parameters
196 if method_name not in ["hourly", "daily"]:
197 raise NotImplementedError(f"Method {method_name} is not implemented")
199 if "latitude" not in params or "longitude" not in params:
200 raise ValueError("Latitude and longitude are required")
202 opt_params = {
203 "exclude": "currently,minutely,alerts,hourly,daily".replace("," + method_name, ""),
204 "units": params.get("units"),
205 }
207 # Build the query
208 query = self.query_string_template.format(
209 api_key=self._api_key,
210 latitude=params["latitude"],
211 longitude=params["longitude"]
212 )
213 if "time" in params:
214 query += f",{params['time']}"
215 # Add optional parameters
216 query += "?" + "&".join([f"{k}={v}" for k, v in opt_params.items() if v])
218 # Call the API
219 response = requests.get(query)
220 response.raise_for_status()
222 # Parse the response
223 data = response.json()
224 if method_name not in data:
225 raise ValueError(f"API response did not contain {method_name} data. Check your API key. Got response: {data}")
227 # Convert to dataframe
228 df = pd.DataFrame(data[method_name]["data"]).assign(
229 latitude=params["latitude"],
230 longitude=params["longitude"],
231 timezone=data["timezone"],
232 offset=data["offset"]
233 )
234 df["localtime"] = pd.to_datetime(df["time"], utc=True, unit="s").dt.tz_convert(data["timezone"])
235 df.drop(columns="time", inplace=True)
236 return df