Coverage for mindsdb / integrations / handlers / databend_handler / databend_handler.py: 0%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2 

3import pandas as pd 

4from databend_sqlalchemy import connector 

5 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

8from mindsdb.integrations.libs.base import DatabaseHandler 

9from databend_sqlalchemy.databend_dialect import DatabendDialect 

10 

11from mindsdb_sql_parser.ast.base import ASTNode 

12 

13from mindsdb.utilities import log 

14from mindsdb.integrations.libs.response import ( 

15 HandlerStatusResponse as StatusResponse, 

16 HandlerResponse as Response, 

17 RESPONSE_TYPE 

18) 

19 

20logger = log.getLogger(__name__) 

21 

22 

23class DatabendHandler(DatabaseHandler): 

24 """ 

25 This handler handles connection and execution of the Databend statements. 

26 """ 

27 name = 'databend' 

28 

29 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

30 """ 

31 Initialize the handler. 

32 Args: 

33 name (str): name of particular handler instance 

34 connection_data (dict): parameters for connecting to the database 

35 **kwargs: arbitrary keyword arguments. 

36 """ 

37 super().__init__(name) 

38 self.parser = parse_sql 

39 self.dialect = 'databend' 

40 

41 self.connection_data = connection_data 

42 self.kwargs = kwargs 

43 

44 self.connection = None 

45 self.is_connected = False 

46 

47 def __del__(self): 

48 if self.is_connected is True: 

49 self.disconnect() 

50 

51 def connect(self) -> StatusResponse: 

52 """ 

53 Set up the connection required by the handler. 

54 Returns: 

55 HandlerStatusResponse 

56 """ 

57 

58 if self.is_connected is True: 

59 return self.connection 

60 

61 if self.connection_data['host'] == 'localhost' or self.connection_data['host'] == '127.0.0.1': 

62 ssl_mode = 'disable' 

63 else: 

64 ssl_mode = 'require' 

65 

66 self.connection = connector.connect( 

67 f"databend://{self.connection_data['user']}:{self.connection_data['password']}@{self.connection_data['host']}:{self.connection_data['port']}/{self.connection_data['database']}?sslmode={ssl_mode}" 

68 ) 

69 self.is_connected = True 

70 

71 return self.connection 

72 

73 def disconnect(self): 

74 """ 

75 Close any existing connections. 

76 """ 

77 

78 if self.is_connected is False: 

79 return 

80 

81 self.connection.close() 

82 self.is_connected = False 

83 return self.is_connected 

84 

85 def check_connection(self) -> StatusResponse: 

86 """ 

87 Check connection to the handler. 

88 Returns: 

89 HandlerStatusResponse 

90 """ 

91 

92 response = StatusResponse(False) 

93 need_to_close = self.is_connected is False 

94 

95 try: 

96 self.connect() 

97 response.success = True 

98 except Exception as e: 

99 logger.error(f'Error connecting to Databend, {e}!') 

100 response.error_message = str(e) 

101 finally: 

102 if response.success is True and need_to_close: 

103 self.disconnect() 

104 if response.success is False and self.is_connected is True: 

105 self.is_connected = False 

106 

107 return response 

108 

109 def native_query(self, query: str) -> StatusResponse: 

110 """ 

111 Receive raw query and act upon it somehow. 

112 Args: 

113 query (str): query in native format 

114 Returns: 

115 HandlerResponse 

116 """ 

117 

118 need_to_close = self.is_connected is False 

119 

120 connection = self.connect() 

121 cursor = connection.cursor() 

122 

123 try: 

124 cursor.execute(query) 

125 result = cursor.fetchall() 

126 if result: 

127 response = Response( 

128 RESPONSE_TYPE.TABLE, 

129 data_frame=pd.DataFrame( 

130 result, 

131 columns=[x[0] for x in cursor.description] 

132 ) 

133 ) 

134 else: 

135 connection.commit() 

136 response = Response(RESPONSE_TYPE.OK) 

137 except Exception as e: 

138 logger.error(f'Error running query: {query} on Databend!') 

139 response = Response( 

140 RESPONSE_TYPE.ERROR, 

141 error_message=str(e) 

142 ) 

143 

144 cursor.close() 

145 if need_to_close is True: 

146 self.disconnect() 

147 

148 return response 

149 

150 def query(self, query: ASTNode) -> StatusResponse: 

151 """ 

152 Receive query as AST (abstract syntax tree) and act upon it somehow. 

153 Args: 

154 query (ASTNode): sql query represented as AST. May be any kind 

155 of query: SELECT, INTSERT, DELETE, etc 

156 Returns: 

157 HandlerResponse 

158 """ 

159 renderer = SqlalchemyRender(DatabendDialect) 

160 query_str = renderer.get_string(query, with_failback=True) 

161 return self.native_query(query_str) 

162 

163 def get_tables(self) -> StatusResponse: 

164 """ 

165 Return list of entities that will be accessible as tables. 

166 Returns: 

167 HandlerResponse 

168 """ 

169 

170 query = f""" 

171 SHOW TABLES IN {self.connection_data["database"]} 

172 """ 

173 result = self.native_query(query) 

174 df = result.data_frame 

175 

176 if df is not None: 

177 df = df[[f'Tables_in_{self.connection_data["database"]}']] 

178 result.data_frame = df.rename(columns={f'Tables_in_{self.connection_data["database"]}': 'table_name'}) 

179 

180 return result 

181 

182 def get_columns(self, table_name: str) -> StatusResponse: 

183 """ 

184 Returns a list of entity columns. 

185 Args: 

186 table_name (str): name of one of tables returned by self.get_tables() 

187 Returns: 

188 HandlerResponse 

189 """ 

190 

191 query = f""" 

192 DESC {self.connection_data["database"]}.{table_name} 

193 """ 

194 result = self.native_query(query) 

195 df = result.data_frame 

196 

197 result.data_frame = df.rename(columns={'Field': 'column_name', 'Type': 'data_type', 'Null': 'is_nullable', 'Default': 'default_value', 'Extra': 'extra'}) 

198 

199 return result