Coverage for mindsdb / integrations / handlers / aqicn_handler / aqicn_handler.py: 0%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb_sql_parser import parse_sql 

2 

3from mindsdb.integrations.handlers.aqicn_handler.aqicn_tables import ( 

4 AQByUserLocationTable, 

5 AQByCityTable, 

6 AQByLatLngTable, 

7 AQByNetworkStationTable 

8) 

9from mindsdb.integrations.handlers.aqicn_handler.aqicn import AQIClient 

10from mindsdb.integrations.libs.api_handler import APIHandler 

11from mindsdb.integrations.libs.response import ( 

12 HandlerStatusResponse as StatusResponse, 

13) 

14from mindsdb.utilities import log 

15 

16 

17logger = log.getLogger(__name__) 

18 

19 

20class AQICNHandler(APIHandler): 

21 """The World Air Quality handler implementation""" 

22 

23 def __init__(self, name: str, **kwargs): 

24 """Initialize the aqicn handler. 

25 

26 Parameters 

27 ---------- 

28 name : str 

29 name of a handler instance 

30 """ 

31 super().__init__(name) 

32 

33 connection_data = kwargs.get("connection_data", {}) 

34 self.connection_data = connection_data 

35 self.kwargs = kwargs 

36 self.aqicn_client = None 

37 self.is_connected = False 

38 

39 ai_user_location_data = AQByUserLocationTable(self) 

40 self._register_table("air_quality_user_location", ai_user_location_data) 

41 

42 ai_city_data = AQByCityTable(self) 

43 self._register_table("air_quality_city", ai_city_data) 

44 

45 ai_lat_lng_data = AQByLatLngTable(self) 

46 self._register_table("air_quality_lat_lng", ai_lat_lng_data) 

47 

48 aq_network_station_data = AQByNetworkStationTable(self) 

49 self._register_table("air_quality_station_by_name", aq_network_station_data) 

50 

51 def connect(self) -> StatusResponse: 

52 """Set up the connection required by the handler. 

53 

54 Returns 

55 ------- 

56 StatusResponse 

57 connection object 

58 """ 

59 resp = StatusResponse(False) 

60 self.aqicn_client = AQIClient(self.connection_data.get("api_key")) 

61 content = self.aqicn_client.air_quality_user_location() 

62 if content["code"] != 200: 

63 resp.success = False 

64 resp.error_message = content["content"]["data"] 

65 self.is_connected = False 

66 return resp 

67 self.is_connected = True 

68 resp.success = True 

69 return resp 

70 

71 def check_connection(self) -> StatusResponse: 

72 """Check connection to the handler. 

73 

74 Returns 

75 ------- 

76 StatusResponse 

77 Status confirmation 

78 """ 

79 response = self.connect() 

80 self.is_connected = response.success 

81 return response 

82 

83 def native_query(self, query: str) -> StatusResponse: 

84 """Receive and process a raw query. 

85 

86 Parameters 

87 ---------- 

88 query : str 

89 query in a native format 

90 

91 Returns 

92 ------- 

93 StatusResponse 

94 Request status 

95 """ 

96 ast = parse_sql(query) 

97 return self.query(ast)