Coverage for mindsdb / integrations / handlers / duckdb_handler / duckdb_handler.py: 0%

81 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import duckdb 

2import pandas as pd 

3from duckdb import DuckDBPyConnection 

4from mindsdb_sql_parser import parse_sql 

5from mindsdb_sql_parser.ast.base import ASTNode 

6from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

7 

8from mindsdb.integrations.libs.base import DatabaseHandler 

9from mindsdb.integrations.libs.response import RESPONSE_TYPE 

10from mindsdb.integrations.libs.response import HandlerResponse as Response 

11from mindsdb.integrations.libs.response import ( 

12 HandlerStatusResponse as StatusResponse, 

13) 

14from mindsdb.utilities import log 

15 

16logger = log.getLogger(__name__) 

17 

18 

19class DuckDBHandler(DatabaseHandler): 

20 """This handler handles connection and execution of the DuckDB statements.""" 

21 

22 name = 'duckdb' 

23 

24 def __init__(self, name: str, **kwargs): 

25 super().__init__(name) 

26 self.parser = parse_sql 

27 self.dialect = 'postgresql' 

28 self.connection_data = kwargs.get('connection_data') 

29 self.renderer = SqlalchemyRender('postgres') 

30 

31 self.connection = None 

32 self.is_connected = False 

33 

34 def __del__(self): 

35 if self.is_connected is True: 

36 self.disconnect() 

37 

38 def connect(self) -> DuckDBPyConnection: 

39 """Connect to a DuckDB database. 

40 

41 Returns: 

42 DuckDBPyConnection: The database connection. 

43 """ 

44 

45 if self.is_connected is True: 

46 return self.connection 

47 

48 args = { 

49 'database': self.connection_data.get('database'), 

50 'read_only': self.connection_data.get('read_only'), 

51 } 

52 

53 self.connection = duckdb.connect(**args) 

54 self.is_connected = True 

55 

56 return self.connection 

57 

58 def disconnect(self): 

59 """Close the database connection.""" 

60 

61 if self.is_connected is False: 

62 return 

63 

64 self.connection.close() 

65 self.is_connected = False 

66 

67 def check_connection(self) -> StatusResponse: 

68 """Check the connection to the DuckDB database. 

69 

70 Returns: 

71 StatusResponse: Connection success status and error message if an error occurs. 

72 """ 

73 

74 response = StatusResponse(False) 

75 need_to_close = self.is_connected is False 

76 

77 try: 

78 self.connect() 

79 response.success = True 

80 except Exception as e: 

81 logger.error( 

82 f'Error connecting to DuckDB {self.connection_data["database"]}, {e}!' 

83 ) 

84 response.error_message = str(e) 

85 finally: 

86 if response.success is True and need_to_close: 

87 self.disconnect() 

88 if response.success is False and self.is_connected is True: 

89 self.is_connected = False 

90 

91 return response 

92 

93 def native_query(self, query: str) -> Response: 

94 """Execute a SQL query. 

95 

96 Args: 

97 query (str): The SQL query to execute. 

98 

99 Returns: 

100 Response: The query result. 

101 """ 

102 need_to_close = self.is_connected is False 

103 

104 connection = self.connect() 

105 cursor = connection.cursor() 

106 

107 try: 

108 cursor.execute(query) 

109 

110 result = cursor.fetchall() 

111 if result: 

112 response = Response( 

113 RESPONSE_TYPE.TABLE, 

114 data_frame=pd.DataFrame( 

115 result, columns=[x[0] for x in cursor.description] 

116 ), 

117 ) 

118 else: 

119 connection.commit() 

120 response = Response(RESPONSE_TYPE.OK) 

121 except Exception as e: 

122 logger.error( 

123 f'Error running query: {query} on {self.connection_data["database"]}!' 

124 ) 

125 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e)) 

126 

127 cursor.close() 

128 if need_to_close is True: 

129 self.disconnect() 

130 

131 return response 

132 

133 def query(self, query: ASTNode) -> Response: 

134 """Render and execute a SQL query. 

135 

136 Args: 

137 query (ASTNode): The SQL query. 

138 

139 Returns: 

140 Response: The query result. 

141 """ 

142 

143 query_str = self.renderer.get_string(query, with_failback=True) 

144 return self.native_query(query_str) 

145 

146 def get_tables(self) -> Response: 

147 """Get a list of all the tables in the database. 

148 

149 Returns: 

150 Response: Names of the tables in the database. 

151 """ 

152 

153 q = 'SHOW TABLES;' 

154 result = self.native_query(q) 

155 df = result.data_frame 

156 result.data_frame = df.rename(columns={df.columns[0]: 'table_name'}) 

157 return result 

158 

159 def get_columns(self, table_name: str) -> Response: 

160 """Get details about a table. 

161 

162 Args: 

163 table_name (str): Name of the table to retrieve details of. 

164 

165 Returns: 

166 Response: Details of the table. 

167 """ 

168 

169 query = f'DESCRIBE {table_name};' 

170 return self.native_query(query)