Coverage for mindsdb / integrations / handlers / dummy_data_handler / dummy_data_handler.py: 82%
52 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import time
2from typing import Optional, List
4import duckdb
5from typing import Any
7from mindsdb.integrations.libs.base import DatabaseHandler
8from mindsdb.integrations.libs.response import RESPONSE_TYPE, HandlerResponse, HandlerStatusResponse
9from mindsdb_sql_parser.ast.base import ASTNode
10from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
13class DummyHandler(DatabaseHandler):
14 name = "dummy_data"
16 def __init__(self, **kwargs):
17 super().__init__("dummy_data")
18 self.db_path = None
20 args = kwargs.get("connection_data", {})
21 if "db_path" in args:
22 self.db_path = args["db_path"]
24 def connect(self):
25 """Set up any connections required by the handler"""
26 return self.db_path is not None
28 def disconnect(self):
29 """Close any existing connections"""
30 return
32 def check_connection(self) -> HandlerStatusResponse:
33 """Check connection to the handler
35 Returns:
36 HandlerStatusResponse
37 """
38 return HandlerStatusResponse(success=True)
40 def native_query(self, query: Any, params: Optional[List] = None) -> HandlerResponse:
41 """Receive raw query and act upon it somehow
43 Args:
44 query (Any): query in native format (str for sql databases, etc)
45 params (Optional[List])
47 Returns:
48 HandlerResponse
49 """
50 con = duckdb.connect(self.db_path)
51 if params is not None:
52 query = query.replace("%s", "?")
53 cur = con.executemany(query, params)
54 if cur.rowcount >= 0: 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true
55 result_df = cur.fetchdf()
56 else:
57 con.close()
58 return HandlerResponse(RESPONSE_TYPE.OK)
59 else:
60 result_df = con.execute(query).fetchdf()
61 con.close()
62 return HandlerResponse(RESPONSE_TYPE.TABLE, result_df)
64 def query(self, query: ASTNode) -> HandlerResponse:
65 """Receive query as AST (abstract syntax tree) and act upon it somehow.
67 Args:
68 query (ASTNode): sql query represented as AST. May be any kind
69 of query: SELECT, INSERT, DELETE, etc
71 Returns:
72 HandlerResponse
73 """
74 renderer = SqlalchemyRender("postgres")
75 query_str, params = renderer.get_exec_params(query, with_failback=True)
76 return self.native_query(query_str, params)
78 def get_tables(self) -> HandlerResponse:
79 """Get a list of all the tables in the database
81 Returns:
82 HandlerResponse: Names of the tables in the database
83 """
84 q = "SHOW TABLES;"
85 result = self.native_query(q)
86 df = result.data_frame
87 result.data_frame = df.rename(columns={df.columns[0]: "table_name"})
88 return result
90 def get_columns(self, table_name: str) -> HandlerResponse:
91 """Get details about a table
93 Args:
94 table_name (str): Name of the table to retrieve details of.
96 Returns:
97 HandlerResponse: Details of the table.
98 """
99 query = f"DESCRIBE {table_name};"
100 return self.native_query(query)
102 def subscribe(self, stop_event, callback, table_name, columns=None, **kwargs):
103 while True:
104 if stop_event.is_set():
105 return
106 time.sleep(0.3)