Coverage for mindsdb / integrations / handlers / aqicn_handler / aqicn_handler.py: 0%
42 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from mindsdb_sql_parser import parse_sql
3from mindsdb.integrations.handlers.aqicn_handler.aqicn_tables import (
4 AQByUserLocationTable,
5 AQByCityTable,
6 AQByLatLngTable,
7 AQByNetworkStationTable
8)
9from mindsdb.integrations.handlers.aqicn_handler.aqicn import AQIClient
10from mindsdb.integrations.libs.api_handler import APIHandler
11from mindsdb.integrations.libs.response import (
12 HandlerStatusResponse as StatusResponse,
13)
14from mindsdb.utilities import log
17logger = log.getLogger(__name__)
20class AQICNHandler(APIHandler):
21 """The World Air Quality handler implementation"""
23 def __init__(self, name: str, **kwargs):
24 """Initialize the aqicn handler.
26 Parameters
27 ----------
28 name : str
29 name of a handler instance
30 """
31 super().__init__(name)
33 connection_data = kwargs.get("connection_data", {})
34 self.connection_data = connection_data
35 self.kwargs = kwargs
36 self.aqicn_client = None
37 self.is_connected = False
39 ai_user_location_data = AQByUserLocationTable(self)
40 self._register_table("air_quality_user_location", ai_user_location_data)
42 ai_city_data = AQByCityTable(self)
43 self._register_table("air_quality_city", ai_city_data)
45 ai_lat_lng_data = AQByLatLngTable(self)
46 self._register_table("air_quality_lat_lng", ai_lat_lng_data)
48 aq_network_station_data = AQByNetworkStationTable(self)
49 self._register_table("air_quality_station_by_name", aq_network_station_data)
51 def connect(self) -> StatusResponse:
52 """Set up the connection required by the handler.
54 Returns
55 -------
56 StatusResponse
57 connection object
58 """
59 resp = StatusResponse(False)
60 self.aqicn_client = AQIClient(self.connection_data.get("api_key"))
61 content = self.aqicn_client.air_quality_user_location()
62 if content["code"] != 200:
63 resp.success = False
64 resp.error_message = content["content"]["data"]
65 self.is_connected = False
66 return resp
67 self.is_connected = True
68 resp.success = True
69 return resp
71 def check_connection(self) -> StatusResponse:
72 """Check connection to the handler.
74 Returns
75 -------
76 StatusResponse
77 Status confirmation
78 """
79 response = self.connect()
80 self.is_connected = response.success
81 return response
83 def native_query(self, query: str) -> StatusResponse:
84 """Receive and process a raw query.
86 Parameters
87 ----------
88 query : str
89 query in a native format
91 Returns
92 -------
93 StatusResponse
94 Request status
95 """
96 ast = parse_sql(query)
97 return self.query(ast)