Coverage for mindsdb / integrations / handlers / influxdb_handler / influxdb_handler.py: 0%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from influxdb_client_3 import InfluxDBClient3 

2from mindsdb.integrations.handlers.influxdb_handler.influxdb_tables import InfluxDBTables 

3from mindsdb.integrations.libs.api_handler import APIHandler 

4from mindsdb.integrations.libs.response import ( 

5 HandlerStatusResponse as StatusResponse, 

6) 

7from mindsdb.utilities import log 

8from mindsdb_sql_parser import parse_sql 

9 

10 

11logger = log.getLogger(__name__) 

12 

13 

14class InfluxDBHandler(APIHandler): 

15 """InfluxDB handler implementation""" 

16 

17 def __init__(self, name=None, **kwargs): 

18 """Initialize the InfluxDB handler. 

19 Parameters 

20 ---------- 

21 name : str 

22 name of a handler instance 

23 """ 

24 super().__init__(name) 

25 

26 connection_data = kwargs.get("connection_data", {}) 

27 

28 self.parser = parse_sql 

29 self.dialect = 'influxdb' 

30 self.connection_data = connection_data 

31 self.kwargs = kwargs 

32 self.connection = None 

33 self.is_connected = False 

34 

35 influxdb_tables_data = InfluxDBTables(self) 

36 self._register_table("tables", influxdb_tables_data) 

37 

38 def connect(self): 

39 """Set up the connection required by the handler. 

40 Returns 

41 ------- 

42 None 

43 

44 Raises Expection if ping check fails 

45 """ 

46 

47 if self.is_connected is True: 

48 return self.connection 

49 

50 self.connection = InfluxDBClient3(host=self.connection_data['influxdb_url'], token=self.connection_data['influxdb_token'], org=self.connection_data.get('org')) 

51 

52 self.is_connected = True 

53 

54 return self.connection 

55 

56 def check_connection(self) -> StatusResponse: 

57 """Check connection to the handler. 

58 Returns 

59 ------- 

60 StatusResponse 

61 Status confirmation 

62 """ 

63 response = StatusResponse(False) 

64 

65 try: 

66 self.connect() 

67 response.success = True 

68 except Exception as e: 

69 logger.error(f"Error connecting to InfluxDB API: {e}!") 

70 response.error_message = e 

71 

72 self.is_connected = response.success 

73 

74 return response 

75 

76 def native_query(self, query: str) -> StatusResponse: 

77 """Receive and process a raw query. 

78 Parameters 

79 ---------- 

80 query : str 

81 query in a native format 

82 Returns 

83 ------- 

84 StatusResponse 

85 Request status 

86 """ 

87 ast = parse_sql(query) 

88 return self.query(ast) 

89 

90 def call_influxdb_tables(self, query): 

91 """Pulls all the records from the given InfluxDB table and returns it select() 

92 

93 Returns 

94 ------- 

95 pd.DataFrame of all the records of the particular InfluxDB 

96 """ 

97 influx_connection = self.connect() 

98 if query is None: 

99 query = 'SELECT * FROM ' + f"{self.connection_data['influxdb_table_name']}" 

100 

101 table = influx_connection.query(query=query, database=self.connection_data['influxdb_db_name'], language='sql') 

102 return table.to_pandas()