Coverage for mindsdb / integrations / handlers / influxdb_handler / influxdb_tables.py: 0%
36 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import pandas as pd
3from typing import List
5from mindsdb.integrations.libs.api_handler import APITable
6from mindsdb.utilities import log
8from mindsdb_sql_parser import ast
9from mindsdb.integrations.utilities.handlers.query_utilities.select_query_utilities import SELECTQueryParser
12logger = log.getLogger(__name__)
15class InfluxDBTables(APITable):
16 """InfluxDB Tables implementation"""
18 def select(self, query: ast.Select) -> pd.DataFrame:
19 """Pulls data from the InfluxDB "query" API endpoint
20 Parameters
21 ----------
22 query : ast.Select
23 Given SQL SELECT query
24 Returns
25 -------
26 pd.DataFrame of particular InfluxDB table matching the query
27 Raises
28 ------
29 ValueError
30 If the query contains an unsupported condition
31 """
33 table_name = self.handler.connection_data['influxdb_table_name']
34 select_statement_parser = SELECTQueryParser(
35 query,
36 "tables",
37 self.get_columns()
38 )
39 selected_columns, where_conditions, order_by_conditions, _ = select_statement_parser.parse_query()
41 try:
42 selected_columns.remove("name")
43 selected_columns.remove("tags")
44 except Exception as e:
45 logger.warn(e)
47 formatted_query = self.get_select_query(table_name, selected_columns, where_conditions, order_by_conditions, query.limit)
48 influxdb_tables_df = self.handler.call_influxdb_tables(formatted_query)
50 return influxdb_tables_df
52 def get_columns(self) -> List[str]:
53 """Gets all columns to be returned in pandas DataFrame responses
54 Returns
55 -------
56 List[str]
57 List of columns
58 """
60 dataframe = self.handler.call_influxdb_tables(f"SELECT * FROM {self.handler.connection_data['influxdb_table_name']} LIMIT 1")
62 return list(dataframe.columns)
64 def get_select_query(self, table_name, selected_columns, where_conditions, order_by_conditions, result_limit):
65 """Gets Well formed Query
66 Returns
67 -------
68 str
69 """
70 columns = ", ".join([f'"{column}"' for column in selected_columns])
71 query = f'SELECT {columns} FROM "{table_name}"'
72 if (where_conditions is not None and len(where_conditions) > 0):
73 query += " WHERE "
74 query += " AND ".join([f"{i[1]} {i[0]} {i[2]}" for i in where_conditions])
75 if (order_by_conditions != {} and order_by_conditions['columns'] is not None and len(order_by_conditions['columns']) > 0):
76 query += " ORDER BY "
77 query += ", ".join([f'{column_name} {"ASC"if asc else "DESC"}' for column_name, asc in zip(order_by_conditions['columns'], order_by_conditions['ascending'])])
78 if (result_limit is not None):
79 query += f" LIMIT {result_limit}"
80 query += ";"
81 return query