Coverage for mindsdb / integrations / handlers / aqicn_handler / aqicn_tables.py: 0%

115 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2from typing import List 

3from mindsdb.integrations.libs.api_handler import APITable 

4from mindsdb.integrations.utilities.handlers.query_utilities import SELECTQueryParser, SELECTQueryExecutor 

5from mindsdb.utilities import log 

6from mindsdb_sql_parser import ast 

7 

8logger = log.getLogger(__name__) 

9 

10 

11class AQByUserLocationTable(APITable): 

12 """The Air Quality By User Location Table implementation""" 

13 

14 def select(self, query: ast.Select) -> pd.DataFrame: 

15 """Pulls data from the https://aqicn.org/json-api/doc/#api-Geolocalized_Feed-GetHereFeed" API 

16 

17 Parameters 

18 ---------- 

19 query : ast.Select 

20 Given SQL SELECT query 

21 

22 Returns 

23 ------- 

24 pd.DataFrame 

25 Air Quality Data 

26 

27 Raises 

28 ------ 

29 ValueError 

30 If the query contains an unsupported condition 

31 """ 

32 

33 select_statement_parser = SELECTQueryParser( 

34 query, 

35 'air_quality_user_location', 

36 self.get_columns() 

37 ) 

38 

39 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

40 

41 subset_where_conditions = [] 

42 for op, arg1, arg2 in where_conditions: 

43 if arg1 in self.get_columns(): 

44 subset_where_conditions.append([op, arg1, arg2]) 

45 

46 df = pd.DataFrame(columns=self.get_columns()) 

47 

48 response = self.handler.aqicn_client.air_quality_user_location() 

49 

50 self.check_res(res=response) 

51 

52 df = pd.json_normalize(response["content"]) 

53 

54 select_statement_executor = SELECTQueryExecutor( 

55 df, 

56 selected_columns, 

57 subset_where_conditions, 

58 order_by_conditions, 

59 result_limit 

60 ) 

61 

62 df = select_statement_executor.execute_query() 

63 

64 return df 

65 

66 def check_res(self, res): 

67 if res["code"] != 200: 

68 raise Exception("Error fetching results - " + res["content"]) 

69 

70 def get_columns(self) -> List[str]: 

71 """Gets all columns to be returned in pandas DataFrame responses 

72 

73 Returns 

74 ------- 

75 List[str] 

76 List of columns 

77 """ 

78 

79 return [ 

80 "status", 

81 "data.aqi", 

82 "data.idx", 

83 "data.attributions", 

84 "data.city.geo", 

85 "data.city.name", 

86 "data.city.url", 

87 "data.city.location", 

88 "data.dominentpol", 

89 "data.iaqi.co.v", 

90 "data.iaqi.dew.v", 

91 "data.iaqi.h.v", 

92 "data.iaqi.no2.v", 

93 "data.iaqi.p.v", 

94 "data.iaqi.pm10.v", 

95 "data.iaqi.so2.v", 

96 "data.iaqi.t.v", 

97 "data.iaqi.w.v", 

98 "data.time.s", 

99 "data.time.tz", 

100 "data.time.v", 

101 "data.time.iso", 

102 "data.forecast.daily.o3", 

103 "data.forecast.daily.pm10", 

104 "data.forecast.daily.pm25", 

105 "data.debug.sync" 

106 ] 

107 

108 

109class AQByCityTable(APITable): 

110 """The Air Quality By City Table implementation""" 

111 

112 def select(self, query: ast.Select) -> pd.DataFrame: 

113 """Pulls data from the https://aqicn.org/json-api/doc/#api-City_Feed-GetCityFeed" API 

114 

115 Parameters 

116 ---------- 

117 query : ast.Select 

118 Given SQL SELECT query 

119 

120 Returns 

121 ------- 

122 pd.DataFrame 

123 Air Quality Data 

124 

125 Raises 

126 ------ 

127 ValueError 

128 If the query contains an unsupported condition 

129 """ 

130 

131 select_statement_parser = SELECTQueryParser( 

132 query, 

133 'air_quality_city', 

134 self.get_columns() 

135 ) 

136 

137 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

138 

139 search_params = {} 

140 subset_where_conditions = [] 

141 for op, arg1, arg2 in where_conditions: 

142 if arg1 == 'city': 

143 if op == '=': 

144 search_params["city"] = arg2 

145 else: 

146 raise NotImplementedError("Only '=' operator is supported for city column.") 

147 elif arg1 in self.get_columns(): 

148 subset_where_conditions.append([op, arg1, arg2]) 

149 

150 filter_flag = ("city" in search_params) 

151 

152 if not filter_flag: 

153 raise NotImplementedError("city column has to be present in where clause.") 

154 

155 df = pd.DataFrame(columns=self.get_columns()) 

156 

157 response = self.handler.aqicn_client.air_quality_city(search_params["city"]) 

158 

159 self.check_res(res=response) 

160 

161 df = pd.json_normalize(response["content"]) 

162 

163 select_statement_executor = SELECTQueryExecutor( 

164 df, 

165 selected_columns, 

166 subset_where_conditions, 

167 order_by_conditions, 

168 result_limit 

169 ) 

170 

171 df = select_statement_executor.execute_query() 

172 

173 return df 

174 

175 def check_res(self, res): 

176 if res["code"] != 200: 

177 raise Exception("Error fetching results - " + res["content"]) 

178 

179 def get_columns(self) -> List[str]: 

180 """Gets all columns to be returned in pandas DataFrame responses 

181 

182 Returns 

183 ------- 

184 List[str] 

185 List of columns 

186 """ 

187 

188 return [ 

189 "status", 

190 "data.aqi", 

191 "data.idx", 

192 "data.attributions", 

193 "data.city.geo", 

194 "data.city.name", 

195 "data.city.url", 

196 "data.city.location", 

197 "data.dominentpol", 

198 "data.iaqi.co.v", 

199 "data.iaqi.dew.v", 

200 "data.iaqi.h.v", 

201 "data.iaqi.no2.v", 

202 "data.iaqi.o3.v", 

203 "data.iaqi.p.v", 

204 "data.iaqi.pm10.v", 

205 "data.iaqi.pm25.v", 

206 "data.iaqi.so2.v", 

207 "data.iaqi.t.v", 

208 "data.iaqi.w.v", 

209 "data.time.s", 

210 "data.time.tz", 

211 "data.time.v", 

212 "data.time.iso", 

213 "data.forecast.daily.o3", 

214 "data.forecast.daily.pm10", 

215 "data.forecast.daily.pm25", 

216 "data.debug.sync" 

217 ] 

218 

219 

220class AQByLatLngTable(APITable): 

221 """The Air Quality By Lat Lng Table implementation""" 

222 

223 def select(self, query: ast.Select) -> pd.DataFrame: 

224 """Pulls data from the https://aqicn.org/json-api/doc/#api-Geolocalized_Feed-GetGeolocFeed" API 

225 

226 Parameters 

227 ---------- 

228 query : ast.Select 

229 Given SQL SELECT query 

230 

231 Returns 

232 ------- 

233 pd.DataFrame 

234 Air Quality Data 

235 

236 Raises 

237 ------ 

238 ValueError 

239 If the query contains an unsupported condition 

240 """ 

241 

242 select_statement_parser = SELECTQueryParser( 

243 query, 

244 'air_quality_lat_lng', 

245 self.get_columns() 

246 ) 

247 

248 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

249 

250 search_params = {} 

251 subset_where_conditions = [] 

252 for op, arg1, arg2 in where_conditions: 

253 if arg1 == 'lat': 

254 if op == '=': 

255 search_params["lat"] = arg2 

256 else: 

257 raise NotImplementedError("Only '=' operator is supported for lat column.") 

258 if arg1 == 'lng': 

259 if op == '=': 

260 search_params["lng"] = arg2 

261 else: 

262 raise NotImplementedError("Only '=' operator is supported for lng column.") 

263 elif arg1 in self.get_columns(): 

264 subset_where_conditions.append([op, arg1, arg2]) 

265 

266 filter_flag = ("lat" in search_params) and ("lng" in search_params) 

267 

268 if not filter_flag: 

269 raise NotImplementedError("lat and lng columns have to be present in where clause.") 

270 

271 df = pd.DataFrame(columns=self.get_columns()) 

272 

273 response = self.handler.aqicn_client.air_quality_lat_lng(search_params["lat"], search_params["lng"]) 

274 

275 self.check_res(res=response) 

276 

277 df = pd.json_normalize(response["content"]) 

278 

279 select_statement_executor = SELECTQueryExecutor( 

280 df, 

281 selected_columns, 

282 subset_where_conditions, 

283 order_by_conditions, 

284 result_limit 

285 ) 

286 

287 df = select_statement_executor.execute_query() 

288 

289 return df 

290 

291 def check_res(self, res): 

292 if res["code"] != 200: 

293 raise Exception("Error fetching results - " + res["content"]) 

294 

295 def get_columns(self) -> List[str]: 

296 """Gets all columns to be returned in pandas DataFrame responses 

297 

298 Returns 

299 ------- 

300 List[str] 

301 List of columns 

302 """ 

303 

304 return [ 

305 "status", 

306 "data.aqi", 

307 "data.idx", 

308 "data.attributions", 

309 "data.city.geo", 

310 "data.city.name", 

311 "data.city.url", 

312 "data.city.location", 

313 "data.dominentpol", 

314 "data.iaqi.co.v", 

315 "data.iaqi.dew.v", 

316 "data.iaqi.h.v", 

317 "data.iaqi.no2.v", 

318 "data.iaqi.o3.v", 

319 "data.iaqi.p.v", 

320 "data.iaqi.pm10.v", 

321 "data.iaqi.pm25.v", 

322 "data.iaqi.so2.v", 

323 "data.iaqi.t.v", 

324 "data.iaqi.w.v", 

325 "data.time.s", 

326 "data.time.tz", 

327 "data.time.v", 

328 "data.time.iso", 

329 "data.forecast.daily.o3", 

330 "data.forecast.daily.pm10", 

331 "data.forecast.daily.pm25", 

332 "data.debug.sync" 

333 ] 

334 

335 

336class AQByNetworkStationTable(APITable): 

337 """The Air Quality By Network Station Table implementation""" 

338 

339 def select(self, query: ast.Select) -> pd.DataFrame: 

340 """Pulls data from the https://aqicn.org/json-api/doc/#api-Search-SearchByName" API 

341 

342 Parameters 

343 ---------- 

344 query : ast.Select 

345 Given SQL SELECT query 

346 

347 Returns 

348 ------- 

349 pd.DataFrame 

350 Air Quality Data 

351 

352 Raises 

353 ------ 

354 ValueError 

355 If the query contains an unsupported condition 

356 """ 

357 

358 select_statement_parser = SELECTQueryParser( 

359 query, 

360 'air_quality_station_by_name', 

361 self.get_columns() 

362 ) 

363 

364 selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() 

365 

366 search_params = {} 

367 subset_where_conditions = [] 

368 for op, arg1, arg2 in where_conditions: 

369 if arg1 == 'name': 

370 if op == '=': 

371 search_params["name"] = arg2 

372 else: 

373 raise NotImplementedError("Only '=' operator is supported for name column.") 

374 elif arg1 in self.get_columns(): 

375 subset_where_conditions.append([op, arg1, arg2]) 

376 

377 filter_flag = ("name" in search_params) 

378 

379 if not filter_flag: 

380 raise NotImplementedError("name column have to be present in where clause.") 

381 

382 df = pd.DataFrame(columns=self.get_columns()) 

383 

384 response = self.handler.aqicn_client.air_quality_station_by_name(search_params["name"]) 

385 

386 self.check_res(res=response) 

387 

388 df = pd.json_normalize(response["content"]["data"]) 

389 

390 select_statement_executor = SELECTQueryExecutor( 

391 df, 

392 selected_columns, 

393 subset_where_conditions, 

394 order_by_conditions, 

395 result_limit 

396 ) 

397 

398 df = select_statement_executor.execute_query() 

399 

400 return df 

401 

402 def check_res(self, res): 

403 if res["code"] != 200: 

404 raise Exception("Error fetching results - " + res["content"]) 

405 

406 def get_columns(self) -> List[str]: 

407 """Gets all columns to be returned in pandas DataFrame responses 

408 

409 Returns 

410 ------- 

411 List[str] 

412 List of columns 

413 """ 

414 

415 return [ 

416 'uid', 

417 'aqi', 

418 'time.tz', 

419 'time.stime', 

420 'time.vtime', 

421 'station.name', 

422 'station.geo', 

423 'station.url', 

424 'station.country' 

425 ]