Coverage for mindsdb / integrations / handlers / databend_handler / databend_handler.py: 0%
89 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
3import pandas as pd
4from databend_sqlalchemy import connector
6from mindsdb_sql_parser import parse_sql
7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
8from mindsdb.integrations.libs.base import DatabaseHandler
9from databend_sqlalchemy.databend_dialect import DatabendDialect
11from mindsdb_sql_parser.ast.base import ASTNode
13from mindsdb.utilities import log
14from mindsdb.integrations.libs.response import (
15 HandlerStatusResponse as StatusResponse,
16 HandlerResponse as Response,
17 RESPONSE_TYPE
18)
20logger = log.getLogger(__name__)
23class DatabendHandler(DatabaseHandler):
24 """
25 This handler handles connection and execution of the Databend statements.
26 """
27 name = 'databend'
29 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
30 """
31 Initialize the handler.
32 Args:
33 name (str): name of particular handler instance
34 connection_data (dict): parameters for connecting to the database
35 **kwargs: arbitrary keyword arguments.
36 """
37 super().__init__(name)
38 self.parser = parse_sql
39 self.dialect = 'databend'
41 self.connection_data = connection_data
42 self.kwargs = kwargs
44 self.connection = None
45 self.is_connected = False
47 def __del__(self):
48 if self.is_connected is True:
49 self.disconnect()
51 def connect(self) -> StatusResponse:
52 """
53 Set up the connection required by the handler.
54 Returns:
55 HandlerStatusResponse
56 """
58 if self.is_connected is True:
59 return self.connection
61 if self.connection_data['host'] == 'localhost' or self.connection_data['host'] == '127.0.0.1':
62 ssl_mode = 'disable'
63 else:
64 ssl_mode = 'require'
66 self.connection = connector.connect(
67 f"databend://{self.connection_data['user']}:{self.connection_data['password']}@{self.connection_data['host']}:{self.connection_data['port']}/{self.connection_data['database']}?sslmode={ssl_mode}"
68 )
69 self.is_connected = True
71 return self.connection
73 def disconnect(self):
74 """
75 Close any existing connections.
76 """
78 if self.is_connected is False:
79 return
81 self.connection.close()
82 self.is_connected = False
83 return self.is_connected
85 def check_connection(self) -> StatusResponse:
86 """
87 Check connection to the handler.
88 Returns:
89 HandlerStatusResponse
90 """
92 response = StatusResponse(False)
93 need_to_close = self.is_connected is False
95 try:
96 self.connect()
97 response.success = True
98 except Exception as e:
99 logger.error(f'Error connecting to Databend, {e}!')
100 response.error_message = str(e)
101 finally:
102 if response.success is True and need_to_close:
103 self.disconnect()
104 if response.success is False and self.is_connected is True:
105 self.is_connected = False
107 return response
109 def native_query(self, query: str) -> StatusResponse:
110 """
111 Receive raw query and act upon it somehow.
112 Args:
113 query (str): query in native format
114 Returns:
115 HandlerResponse
116 """
118 need_to_close = self.is_connected is False
120 connection = self.connect()
121 cursor = connection.cursor()
123 try:
124 cursor.execute(query)
125 result = cursor.fetchall()
126 if result:
127 response = Response(
128 RESPONSE_TYPE.TABLE,
129 data_frame=pd.DataFrame(
130 result,
131 columns=[x[0] for x in cursor.description]
132 )
133 )
134 else:
135 connection.commit()
136 response = Response(RESPONSE_TYPE.OK)
137 except Exception as e:
138 logger.error(f'Error running query: {query} on Databend!')
139 response = Response(
140 RESPONSE_TYPE.ERROR,
141 error_message=str(e)
142 )
144 cursor.close()
145 if need_to_close is True:
146 self.disconnect()
148 return response
150 def query(self, query: ASTNode) -> StatusResponse:
151 """
152 Receive query as AST (abstract syntax tree) and act upon it somehow.
153 Args:
154 query (ASTNode): sql query represented as AST. May be any kind
155 of query: SELECT, INTSERT, DELETE, etc
156 Returns:
157 HandlerResponse
158 """
159 renderer = SqlalchemyRender(DatabendDialect)
160 query_str = renderer.get_string(query, with_failback=True)
161 return self.native_query(query_str)
163 def get_tables(self) -> StatusResponse:
164 """
165 Return list of entities that will be accessible as tables.
166 Returns:
167 HandlerResponse
168 """
170 query = f"""
171 SHOW TABLES IN {self.connection_data["database"]}
172 """
173 result = self.native_query(query)
174 df = result.data_frame
176 if df is not None:
177 df = df[[f'Tables_in_{self.connection_data["database"]}']]
178 result.data_frame = df.rename(columns={f'Tables_in_{self.connection_data["database"]}': 'table_name'})
180 return result
182 def get_columns(self, table_name: str) -> StatusResponse:
183 """
184 Returns a list of entity columns.
185 Args:
186 table_name (str): name of one of tables returned by self.get_tables()
187 Returns:
188 HandlerResponse
189 """
191 query = f"""
192 DESC {self.connection_data["database"]}.{table_name}
193 """
194 result = self.native_query(query)
195 df = result.data_frame
197 result.data_frame = df.rename(columns={'Field': 'column_name', 'Type': 'data_type', 'Null': 'is_nullable', 'Default': 'default_value', 'Extra': 'extra'})
199 return result