Coverage for mindsdb / integrations / handlers / questdb_handler / questdb_handler.py: 0%

38 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import pandas as pd 

2import numpy as np 

3 

4from questdb.ingress import Sender 

5 

6from mindsdb.integrations.handlers.postgres_handler import Handler as PostgresHandler 

7from mindsdb.integrations.libs.response import ( 

8 HandlerResponse as Response, 

9 RESPONSE_TYPE 

10) 

11from mindsdb.utilities import log 

12 

13 

14logger = log.getLogger(__name__) 

15 

16 

17class QuestDBHandler(PostgresHandler): 

18 """ 

19 This handler handles connection and execution of the QuestDB statements. 

20 TODO: check the dialect for questdb 

21 """ 

22 name = 'questdb' 

23 

24 def __init__(self, name, **kwargs): 

25 super().__init__(name, **kwargs) 

26 

27 def get_tables(self): 

28 """ 

29 List all tabels in QuestDB 

30 """ 

31 query = "SHOW TABLES" 

32 response = super().native_query(query) 

33 return response 

34 

35 def get_columns(self, table_name): 

36 """ 

37 List information about the table 

38 """ 

39 query = f"SELECT * FROM tables() WHERE name='{table_name}';" 

40 response = super().native_query(query) 

41 return response 

42 

43 def qdb_connect(self): 

44 args = self.connection_args 

45 conf = f"http::addr={args['host']}:9000;username={args['user']};password={args['password']};" 

46 return Sender.from_conf(conf) 

47 

48 def insert(self, table_name: str, df: pd.DataFrame): 

49 

50 with self.qdb_connect() as sender: 

51 try: 

52 # find datetime column 

53 at_col = None 

54 for col, dtype in df.dtypes.items(): 

55 if np.issubdtype(dtype, np.datetime64): 

56 at_col = col 

57 if at_col is None: 

58 raise Exception(f'Unable to find datetime column: {df.dtypes}') 

59 

60 sender.dataframe(df, table_name=table_name, at=at_col) 

61 response = Response(RESPONSE_TYPE.OK) 

62 

63 except Exception as e: 

64 logger.error(f'Error running insert to {table_name} on {self.database}, {e}!') 

65 response = Response( 

66 RESPONSE_TYPE.ERROR, 

67 error_code=0, 

68 error_message=str(e) 

69 ) 

70 

71 return response