Coverage for mindsdb / integrations / handlers / influxdb_handler / influxdb_tables.py: 0%

36 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2 

3from typing import List 

4 

5from mindsdb.integrations.libs.api_handler import APITable 

6from mindsdb.utilities import log 

7 

8from mindsdb_sql_parser import ast 

9from mindsdb.integrations.utilities.handlers.query_utilities.select_query_utilities import SELECTQueryParser 

10 

11 

12logger = log.getLogger(__name__) 

13 

14 

15class InfluxDBTables(APITable): 

16 """InfluxDB Tables implementation""" 

17 

18 def select(self, query: ast.Select) -> pd.DataFrame: 

19 """Pulls data from the InfluxDB "query" API endpoint 

20 Parameters 

21 ---------- 

22 query : ast.Select 

23 Given SQL SELECT query 

24 Returns 

25 ------- 

26 pd.DataFrame of particular InfluxDB table matching the query 

27 Raises 

28 ------ 

29 ValueError 

30 If the query contains an unsupported condition 

31 """ 

32 

33 table_name = self.handler.connection_data['influxdb_table_name'] 

34 select_statement_parser = SELECTQueryParser( 

35 query, 

36 "tables", 

37 self.get_columns() 

38 ) 

39 selected_columns, where_conditions, order_by_conditions, _ = select_statement_parser.parse_query() 

40 

41 try: 

42 selected_columns.remove("name") 

43 selected_columns.remove("tags") 

44 except Exception as e: 

45 logger.warn(e) 

46 

47 formatted_query = self.get_select_query(table_name, selected_columns, where_conditions, order_by_conditions, query.limit) 

48 influxdb_tables_df = self.handler.call_influxdb_tables(formatted_query) 

49 

50 return influxdb_tables_df 

51 

52 def get_columns(self) -> List[str]: 

53 """Gets all columns to be returned in pandas DataFrame responses 

54 Returns 

55 ------- 

56 List[str] 

57 List of columns 

58 """ 

59 

60 dataframe = self.handler.call_influxdb_tables(f"SELECT * FROM {self.handler.connection_data['influxdb_table_name']} LIMIT 1") 

61 

62 return list(dataframe.columns) 

63 

64 def get_select_query(self, table_name, selected_columns, where_conditions, order_by_conditions, result_limit): 

65 """Gets Well formed Query 

66 Returns 

67 ------- 

68 str 

69 """ 

70 columns = ", ".join([f'"{column}"' for column in selected_columns]) 

71 query = f'SELECT {columns} FROM "{table_name}"' 

72 if (where_conditions is not None and len(where_conditions) > 0): 

73 query += " WHERE " 

74 query += " AND ".join([f"{i[1]} {i[0]} {i[2]}" for i in where_conditions]) 

75 if (order_by_conditions != {} and order_by_conditions['columns'] is not None and len(order_by_conditions['columns']) > 0): 

76 query += " ORDER BY " 

77 query += ", ".join([f'{column_name} {"ASC"if asc else "DESC"}' for column_name, asc in zip(order_by_conditions['columns'], order_by_conditions['ascending'])]) 

78 if (result_limit is not None): 

79 query += f" LIMIT {result_limit}" 

80 query += ";" 

81 return query