Coverage for mindsdb / integrations / handlers / influxdb_handler / influxdb_handler.py: 0%
44 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from influxdb_client_3 import InfluxDBClient3
2from mindsdb.integrations.handlers.influxdb_handler.influxdb_tables import InfluxDBTables
3from mindsdb.integrations.libs.api_handler import APIHandler
4from mindsdb.integrations.libs.response import (
5 HandlerStatusResponse as StatusResponse,
6)
7from mindsdb.utilities import log
8from mindsdb_sql_parser import parse_sql
11logger = log.getLogger(__name__)
14class InfluxDBHandler(APIHandler):
15 """InfluxDB handler implementation"""
17 def __init__(self, name=None, **kwargs):
18 """Initialize the InfluxDB handler.
19 Parameters
20 ----------
21 name : str
22 name of a handler instance
23 """
24 super().__init__(name)
26 connection_data = kwargs.get("connection_data", {})
28 self.parser = parse_sql
29 self.dialect = 'influxdb'
30 self.connection_data = connection_data
31 self.kwargs = kwargs
32 self.connection = None
33 self.is_connected = False
35 influxdb_tables_data = InfluxDBTables(self)
36 self._register_table("tables", influxdb_tables_data)
38 def connect(self):
39 """Set up the connection required by the handler.
40 Returns
41 -------
42 None
44 Raises Expection if ping check fails
45 """
47 if self.is_connected is True:
48 return self.connection
50 self.connection = InfluxDBClient3(host=self.connection_data['influxdb_url'], token=self.connection_data['influxdb_token'], org=self.connection_data.get('org'))
52 self.is_connected = True
54 return self.connection
56 def check_connection(self) -> StatusResponse:
57 """Check connection to the handler.
58 Returns
59 -------
60 StatusResponse
61 Status confirmation
62 """
63 response = StatusResponse(False)
65 try:
66 self.connect()
67 response.success = True
68 except Exception as e:
69 logger.error(f"Error connecting to InfluxDB API: {e}!")
70 response.error_message = e
72 self.is_connected = response.success
74 return response
76 def native_query(self, query: str) -> StatusResponse:
77 """Receive and process a raw query.
78 Parameters
79 ----------
80 query : str
81 query in a native format
82 Returns
83 -------
84 StatusResponse
85 Request status
86 """
87 ast = parse_sql(query)
88 return self.query(ast)
90 def call_influxdb_tables(self, query):
91 """Pulls all the records from the given InfluxDB table and returns it select()
93 Returns
94 -------
95 pd.DataFrame of all the records of the particular InfluxDB
96 """
97 influx_connection = self.connect()
98 if query is None:
99 query = 'SELECT * FROM ' + f"{self.connection_data['influxdb_table_name']}"
101 table = influx_connection.query(query=query, database=self.connection_data['influxdb_db_name'], language='sql')
102 return table.to_pandas()