Coverage for mindsdb / integrations / handlers / duckdb_handler / duckdb_handler.py: 0%
81 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1import duckdb
2import pandas as pd
3from duckdb import DuckDBPyConnection
4from mindsdb_sql_parser import parse_sql
5from mindsdb_sql_parser.ast.base import ASTNode
6from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
8from mindsdb.integrations.libs.base import DatabaseHandler
9from mindsdb.integrations.libs.response import RESPONSE_TYPE
10from mindsdb.integrations.libs.response import HandlerResponse as Response
11from mindsdb.integrations.libs.response import (
12 HandlerStatusResponse as StatusResponse,
13)
14from mindsdb.utilities import log
16logger = log.getLogger(__name__)
19class DuckDBHandler(DatabaseHandler):
20 """This handler handles connection and execution of the DuckDB statements."""
22 name = 'duckdb'
24 def __init__(self, name: str, **kwargs):
25 super().__init__(name)
26 self.parser = parse_sql
27 self.dialect = 'postgresql'
28 self.connection_data = kwargs.get('connection_data')
29 self.renderer = SqlalchemyRender('postgres')
31 self.connection = None
32 self.is_connected = False
34 def __del__(self):
35 if self.is_connected is True:
36 self.disconnect()
38 def connect(self) -> DuckDBPyConnection:
39 """Connect to a DuckDB database.
41 Returns:
42 DuckDBPyConnection: The database connection.
43 """
45 if self.is_connected is True:
46 return self.connection
48 args = {
49 'database': self.connection_data.get('database'),
50 'read_only': self.connection_data.get('read_only'),
51 }
53 self.connection = duckdb.connect(**args)
54 self.is_connected = True
56 return self.connection
58 def disconnect(self):
59 """Close the database connection."""
61 if self.is_connected is False:
62 return
64 self.connection.close()
65 self.is_connected = False
67 def check_connection(self) -> StatusResponse:
68 """Check the connection to the DuckDB database.
70 Returns:
71 StatusResponse: Connection success status and error message if an error occurs.
72 """
74 response = StatusResponse(False)
75 need_to_close = self.is_connected is False
77 try:
78 self.connect()
79 response.success = True
80 except Exception as e:
81 logger.error(
82 f'Error connecting to DuckDB {self.connection_data["database"]}, {e}!'
83 )
84 response.error_message = str(e)
85 finally:
86 if response.success is True and need_to_close:
87 self.disconnect()
88 if response.success is False and self.is_connected is True:
89 self.is_connected = False
91 return response
93 def native_query(self, query: str) -> Response:
94 """Execute a SQL query.
96 Args:
97 query (str): The SQL query to execute.
99 Returns:
100 Response: The query result.
101 """
102 need_to_close = self.is_connected is False
104 connection = self.connect()
105 cursor = connection.cursor()
107 try:
108 cursor.execute(query)
110 result = cursor.fetchall()
111 if result:
112 response = Response(
113 RESPONSE_TYPE.TABLE,
114 data_frame=pd.DataFrame(
115 result, columns=[x[0] for x in cursor.description]
116 ),
117 )
118 else:
119 connection.commit()
120 response = Response(RESPONSE_TYPE.OK)
121 except Exception as e:
122 logger.error(
123 f'Error running query: {query} on {self.connection_data["database"]}!'
124 )
125 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e))
127 cursor.close()
128 if need_to_close is True:
129 self.disconnect()
131 return response
133 def query(self, query: ASTNode) -> Response:
134 """Render and execute a SQL query.
136 Args:
137 query (ASTNode): The SQL query.
139 Returns:
140 Response: The query result.
141 """
143 query_str = self.renderer.get_string(query, with_failback=True)
144 return self.native_query(query_str)
146 def get_tables(self) -> Response:
147 """Get a list of all the tables in the database.
149 Returns:
150 Response: Names of the tables in the database.
151 """
153 q = 'SHOW TABLES;'
154 result = self.native_query(q)
155 df = result.data_frame
156 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'})
157 return result
159 def get_columns(self, table_name: str) -> Response:
160 """Get details about a table.
162 Args:
163 table_name (str): Name of the table to retrieve details of.
165 Returns:
166 Response: Details of the table.
167 """
169 query = f'DESCRIBE {table_name};'
170 return self.native_query(query)