Coverage for mindsdb / integrations / handlers / dummy_data_handler / dummy_data_handler.py: 82%

52 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import time 

2from typing import Optional, List 

3 

4import duckdb 

5from typing import Any 

6 

7from mindsdb.integrations.libs.base import DatabaseHandler 

8from mindsdb.integrations.libs.response import RESPONSE_TYPE, HandlerResponse, HandlerStatusResponse 

9from mindsdb_sql_parser.ast.base import ASTNode 

10from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

11 

12 

13class DummyHandler(DatabaseHandler): 

14 name = "dummy_data" 

15 

16 def __init__(self, **kwargs): 

17 super().__init__("dummy_data") 

18 self.db_path = None 

19 

20 args = kwargs.get("connection_data", {}) 

21 if "db_path" in args: 

22 self.db_path = args["db_path"] 

23 

24 def connect(self): 

25 """Set up any connections required by the handler""" 

26 return self.db_path is not None 

27 

28 def disconnect(self): 

29 """Close any existing connections""" 

30 return 

31 

32 def check_connection(self) -> HandlerStatusResponse: 

33 """Check connection to the handler 

34 

35 Returns: 

36 HandlerStatusResponse 

37 """ 

38 return HandlerStatusResponse(success=True) 

39 

40 def native_query(self, query: Any, params: Optional[List] = None) -> HandlerResponse: 

41 """Receive raw query and act upon it somehow 

42 

43 Args: 

44 query (Any): query in native format (str for sql databases, etc) 

45 params (Optional[List]) 

46 

47 Returns: 

48 HandlerResponse 

49 """ 

50 con = duckdb.connect(self.db_path) 

51 if params is not None: 

52 query = query.replace("%s", "?") 

53 cur = con.executemany(query, params) 

54 if cur.rowcount >= 0: 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true

55 result_df = cur.fetchdf() 

56 else: 

57 con.close() 

58 return HandlerResponse(RESPONSE_TYPE.OK) 

59 else: 

60 result_df = con.execute(query).fetchdf() 

61 con.close() 

62 return HandlerResponse(RESPONSE_TYPE.TABLE, result_df) 

63 

64 def query(self, query: ASTNode) -> HandlerResponse: 

65 """Receive query as AST (abstract syntax tree) and act upon it somehow. 

66 

67 Args: 

68 query (ASTNode): sql query represented as AST. May be any kind 

69 of query: SELECT, INSERT, DELETE, etc 

70 

71 Returns: 

72 HandlerResponse 

73 """ 

74 renderer = SqlalchemyRender("postgres") 

75 query_str, params = renderer.get_exec_params(query, with_failback=True) 

76 return self.native_query(query_str, params) 

77 

78 def get_tables(self) -> HandlerResponse: 

79 """Get a list of all the tables in the database 

80 

81 Returns: 

82 HandlerResponse: Names of the tables in the database 

83 """ 

84 q = "SHOW TABLES;" 

85 result = self.native_query(q) 

86 df = result.data_frame 

87 result.data_frame = df.rename(columns={df.columns[0]: "table_name"}) 

88 return result 

89 

90 def get_columns(self, table_name: str) -> HandlerResponse: 

91 """Get details about a table 

92 

93 Args: 

94 table_name (str): Name of the table to retrieve details of. 

95 

96 Returns: 

97 HandlerResponse: Details of the table. 

98 """ 

99 query = f"DESCRIBE {table_name};" 

100 return self.native_query(query) 

101 

102 def subscribe(self, stop_event, callback, table_name, columns=None, **kwargs): 

103 while True: 

104 if stop_event.is_set(): 

105 return 

106 time.sleep(0.3)