Coverage for mindsdb / integrations / handlers / aqicn_handler / aqicn_tables.py: 0%
115 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pandas as pd
2from typing import List
3from mindsdb.integrations.libs.api_handler import APITable
4from mindsdb.integrations.utilities.handlers.query_utilities import SELECTQueryParser, SELECTQueryExecutor
5from mindsdb.utilities import log
6from mindsdb_sql_parser import ast
8logger = log.getLogger(__name__)
11class AQByUserLocationTable(APITable):
12 """The Air Quality By User Location Table implementation"""
14 def select(self, query: ast.Select) -> pd.DataFrame:
15 """Pulls data from the https://aqicn.org/json-api/doc/#api-Geolocalized_Feed-GetHereFeed" API
17 Parameters
18 ----------
19 query : ast.Select
20 Given SQL SELECT query
22 Returns
23 -------
24 pd.DataFrame
25 Air Quality Data
27 Raises
28 ------
29 ValueError
30 If the query contains an unsupported condition
31 """
33 select_statement_parser = SELECTQueryParser(
34 query,
35 'air_quality_user_location',
36 self.get_columns()
37 )
39 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query()
41 subset_where_conditions = []
42 for op, arg1, arg2 in where_conditions:
43 if arg1 in self.get_columns():
44 subset_where_conditions.append([op, arg1, arg2])
46 df = pd.DataFrame(columns=self.get_columns())
48 response = self.handler.aqicn_client.air_quality_user_location()
50 self.check_res(res=response)
52 df = pd.json_normalize(response["content"])
54 select_statement_executor = SELECTQueryExecutor(
55 df,
56 selected_columns,
57 subset_where_conditions,
58 order_by_conditions,
59 result_limit
60 )
62 df = select_statement_executor.execute_query()
64 return df
66 def check_res(self, res):
67 if res["code"] != 200:
68 raise Exception("Error fetching results - " + res["content"])
70 def get_columns(self) -> List[str]:
71 """Gets all columns to be returned in pandas DataFrame responses
73 Returns
74 -------
75 List[str]
76 List of columns
77 """
79 return [
80 "status",
81 "data.aqi",
82 "data.idx",
83 "data.attributions",
84 "data.city.geo",
85 "data.city.name",
86 "data.city.url",
87 "data.city.location",
88 "data.dominentpol",
89 "data.iaqi.co.v",
90 "data.iaqi.dew.v",
91 "data.iaqi.h.v",
92 "data.iaqi.no2.v",
93 "data.iaqi.p.v",
94 "data.iaqi.pm10.v",
95 "data.iaqi.so2.v",
96 "data.iaqi.t.v",
97 "data.iaqi.w.v",
98 "data.time.s",
99 "data.time.tz",
100 "data.time.v",
101 "data.time.iso",
102 "data.forecast.daily.o3",
103 "data.forecast.daily.pm10",
104 "data.forecast.daily.pm25",
105 "data.debug.sync"
106 ]
109class AQByCityTable(APITable):
110 """The Air Quality By City Table implementation"""
112 def select(self, query: ast.Select) -> pd.DataFrame:
113 """Pulls data from the https://aqicn.org/json-api/doc/#api-City_Feed-GetCityFeed" API
115 Parameters
116 ----------
117 query : ast.Select
118 Given SQL SELECT query
120 Returns
121 -------
122 pd.DataFrame
123 Air Quality Data
125 Raises
126 ------
127 ValueError
128 If the query contains an unsupported condition
129 """
131 select_statement_parser = SELECTQueryParser(
132 query,
133 'air_quality_city',
134 self.get_columns()
135 )
137 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query()
139 search_params = {}
140 subset_where_conditions = []
141 for op, arg1, arg2 in where_conditions:
142 if arg1 == 'city':
143 if op == '=':
144 search_params["city"] = arg2
145 else:
146 raise NotImplementedError("Only '=' operator is supported for city column.")
147 elif arg1 in self.get_columns():
148 subset_where_conditions.append([op, arg1, arg2])
150 filter_flag = ("city" in search_params)
152 if not filter_flag:
153 raise NotImplementedError("city column has to be present in where clause.")
155 df = pd.DataFrame(columns=self.get_columns())
157 response = self.handler.aqicn_client.air_quality_city(search_params["city"])
159 self.check_res(res=response)
161 df = pd.json_normalize(response["content"])
163 select_statement_executor = SELECTQueryExecutor(
164 df,
165 selected_columns,
166 subset_where_conditions,
167 order_by_conditions,
168 result_limit
169 )
171 df = select_statement_executor.execute_query()
173 return df
175 def check_res(self, res):
176 if res["code"] != 200:
177 raise Exception("Error fetching results - " + res["content"])
179 def get_columns(self) -> List[str]:
180 """Gets all columns to be returned in pandas DataFrame responses
182 Returns
183 -------
184 List[str]
185 List of columns
186 """
188 return [
189 "status",
190 "data.aqi",
191 "data.idx",
192 "data.attributions",
193 "data.city.geo",
194 "data.city.name",
195 "data.city.url",
196 "data.city.location",
197 "data.dominentpol",
198 "data.iaqi.co.v",
199 "data.iaqi.dew.v",
200 "data.iaqi.h.v",
201 "data.iaqi.no2.v",
202 "data.iaqi.o3.v",
203 "data.iaqi.p.v",
204 "data.iaqi.pm10.v",
205 "data.iaqi.pm25.v",
206 "data.iaqi.so2.v",
207 "data.iaqi.t.v",
208 "data.iaqi.w.v",
209 "data.time.s",
210 "data.time.tz",
211 "data.time.v",
212 "data.time.iso",
213 "data.forecast.daily.o3",
214 "data.forecast.daily.pm10",
215 "data.forecast.daily.pm25",
216 "data.debug.sync"
217 ]
220class AQByLatLngTable(APITable):
221 """The Air Quality By Lat Lng Table implementation"""
223 def select(self, query: ast.Select) -> pd.DataFrame:
224 """Pulls data from the https://aqicn.org/json-api/doc/#api-Geolocalized_Feed-GetGeolocFeed" API
226 Parameters
227 ----------
228 query : ast.Select
229 Given SQL SELECT query
231 Returns
232 -------
233 pd.DataFrame
234 Air Quality Data
236 Raises
237 ------
238 ValueError
239 If the query contains an unsupported condition
240 """
242 select_statement_parser = SELECTQueryParser(
243 query,
244 'air_quality_lat_lng',
245 self.get_columns()
246 )
248 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query()
250 search_params = {}
251 subset_where_conditions = []
252 for op, arg1, arg2 in where_conditions:
253 if arg1 == 'lat':
254 if op == '=':
255 search_params["lat"] = arg2
256 else:
257 raise NotImplementedError("Only '=' operator is supported for lat column.")
258 if arg1 == 'lng':
259 if op == '=':
260 search_params["lng"] = arg2
261 else:
262 raise NotImplementedError("Only '=' operator is supported for lng column.")
263 elif arg1 in self.get_columns():
264 subset_where_conditions.append([op, arg1, arg2])
266 filter_flag = ("lat" in search_params) and ("lng" in search_params)
268 if not filter_flag:
269 raise NotImplementedError("lat and lng columns have to be present in where clause.")
271 df = pd.DataFrame(columns=self.get_columns())
273 response = self.handler.aqicn_client.air_quality_lat_lng(search_params["lat"], search_params["lng"])
275 self.check_res(res=response)
277 df = pd.json_normalize(response["content"])
279 select_statement_executor = SELECTQueryExecutor(
280 df,
281 selected_columns,
282 subset_where_conditions,
283 order_by_conditions,
284 result_limit
285 )
287 df = select_statement_executor.execute_query()
289 return df
291 def check_res(self, res):
292 if res["code"] != 200:
293 raise Exception("Error fetching results - " + res["content"])
295 def get_columns(self) -> List[str]:
296 """Gets all columns to be returned in pandas DataFrame responses
298 Returns
299 -------
300 List[str]
301 List of columns
302 """
304 return [
305 "status",
306 "data.aqi",
307 "data.idx",
308 "data.attributions",
309 "data.city.geo",
310 "data.city.name",
311 "data.city.url",
312 "data.city.location",
313 "data.dominentpol",
314 "data.iaqi.co.v",
315 "data.iaqi.dew.v",
316 "data.iaqi.h.v",
317 "data.iaqi.no2.v",
318 "data.iaqi.o3.v",
319 "data.iaqi.p.v",
320 "data.iaqi.pm10.v",
321 "data.iaqi.pm25.v",
322 "data.iaqi.so2.v",
323 "data.iaqi.t.v",
324 "data.iaqi.w.v",
325 "data.time.s",
326 "data.time.tz",
327 "data.time.v",
328 "data.time.iso",
329 "data.forecast.daily.o3",
330 "data.forecast.daily.pm10",
331 "data.forecast.daily.pm25",
332 "data.debug.sync"
333 ]
336class AQByNetworkStationTable(APITable):
337 """The Air Quality By Network Station Table implementation"""
339 def select(self, query: ast.Select) -> pd.DataFrame:
340 """Pulls data from the https://aqicn.org/json-api/doc/#api-Search-SearchByName" API
342 Parameters
343 ----------
344 query : ast.Select
345 Given SQL SELECT query
347 Returns
348 -------
349 pd.DataFrame
350 Air Quality Data
352 Raises
353 ------
354 ValueError
355 If the query contains an unsupported condition
356 """
358 select_statement_parser = SELECTQueryParser(
359 query,
360 'air_quality_station_by_name',
361 self.get_columns()
362 )
364 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query()
366 search_params = {}
367 subset_where_conditions = []
368 for op, arg1, arg2 in where_conditions:
369 if arg1 == 'name':
370 if op == '=':
371 search_params["name"] = arg2
372 else:
373 raise NotImplementedError("Only '=' operator is supported for name column.")
374 elif arg1 in self.get_columns():
375 subset_where_conditions.append([op, arg1, arg2])
377 filter_flag = ("name" in search_params)
379 if not filter_flag:
380 raise NotImplementedError("name column have to be present in where clause.")
382 df = pd.DataFrame(columns=self.get_columns())
384 response = self.handler.aqicn_client.air_quality_station_by_name(search_params["name"])
386 self.check_res(res=response)
388 df = pd.json_normalize(response["content"]["data"])
390 select_statement_executor = SELECTQueryExecutor(
391 df,
392 selected_columns,
393 subset_where_conditions,
394 order_by_conditions,
395 result_limit
396 )
398 df = select_statement_executor.execute_query()
400 return df
402 def check_res(self, res):
403 if res["code"] != 200:
404 raise Exception("Error fetching results - " + res["content"])
406 def get_columns(self) -> List[str]:
407 """Gets all columns to be returned in pandas DataFrame responses
409 Returns
410 -------
411 List[str]
412 List of columns
413 """
415 return [
416 'uid',
417 'aqi',
418 'time.tz',
419 'time.stime',
420 'time.vtime',
421 'station.name',
422 'station.geo',
423 'station.url',
424 'station.country'
425 ]