Coverage for mindsdb / integrations / handlers / clickhouse_handler / clickhouse_handler.py: 85%

98 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from urllib.parse import quote, urlencode 

2 

3import pandas as pd 

4from sqlalchemy import create_engine 

5from sqlalchemy.exc import SQLAlchemyError 

6from clickhouse_sqlalchemy.drivers.base import ClickHouseDialect 

7from mindsdb_sql_parser.ast.base import ASTNode 

8from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

9 

10from mindsdb.utilities import log 

11from mindsdb.integrations.libs.base import DatabaseHandler 

12from mindsdb.integrations.libs.response import ( 

13 HandlerStatusResponse as StatusResponse, 

14 HandlerResponse as Response, 

15 RESPONSE_TYPE, 

16) 

17 

18logger = log.getLogger(__name__) 

19 

20 

21class ClickHouseHandler(DatabaseHandler): 

22 """ 

23 This handler handles connection and execution of the ClickHouse statements. 

24 """ 

25 

26 name = "clickhouse" 

27 

28 def __init__(self, name, connection_data, **kwargs): 

29 super().__init__(name) 

30 self.dialect = "clickhouse" 

31 self.connection_data = connection_data 

32 self.renderer = SqlalchemyRender(ClickHouseDialect) 

33 self.is_connected = False 

34 self.protocol = connection_data.get("protocol", "native") 

35 

36 def __del__(self): 

37 if self.is_connected is True: 

38 self.disconnect() 

39 

40 def connect(self): 

41 """ 

42 Establishes a connection to a ClickHouse server using SQLAlchemy. 

43 

44 Raises: 

45 SQLAlchemyError: If an error occurs while connecting to the database. 

46 

47 Returns: 

48 Connection: A SQLAlchemy Connection object to the ClickHouse database. 

49 """ 

50 if self.is_connected: 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true

51 return self.connection 

52 

53 protocol = "clickhouse+native" if self.protocol == "native" else "clickhouse+http" 

54 host = quote(self.connection_data["host"]) 

55 port = self.connection_data["port"] 

56 user = quote(self.connection_data["user"]) 

57 password = quote(self.connection_data["password"]) 

58 database = quote(self.connection_data["database"]) 

59 verify = self.connection_data.get("verify", True) 

60 url = f"{protocol}://{user}:{password}@{host}:{port}/{database}" 

61 # This is not redundunt. Check https://clickhouse-sqlalchemy.readthedocs.io/en/latest/connection.html#http 

62 

63 params = {} 

64 if self.protocol == "https": 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true

65 params["protocol"] = "https" 

66 if verify is False: 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true

67 params["verify"] = "false" 

68 if params: 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true

69 url = f"{url}?{urlencode(params)}" 

70 

71 try: 

72 engine = create_engine(url) 

73 connection = engine.raw_connection() 

74 self.is_connected = True 

75 self.connection = connection 

76 except SQLAlchemyError as e: 

77 logger.error(f"Error connecting to ClickHouse {self.connection_data['database']}, {e}!") 

78 self.is_connected = False 

79 raise 

80 

81 return self.connection 

82 

83 def check_connection(self) -> StatusResponse: 

84 """ 

85 Checks the status of the connection to the ClickHouse. 

86 

87 Returns: 

88 StatusResponse: An object containing the success status and an error message if an error occurs. 

89 """ 

90 response = StatusResponse(False) 

91 need_to_close = not self.is_connected 

92 

93 try: 

94 connection = self.connect() 

95 cur = connection.cursor() 

96 try: 

97 cur.execute("select 1;") 

98 finally: 

99 cur.close() 

100 response.success = True 

101 except SQLAlchemyError as e: 

102 logger.error(f"Error connecting to ClickHouse {self.connection_data['database']}, {e}!") 

103 response.error_message = str(e) 

104 self.is_connected = False 

105 

106 if response.success is True and need_to_close: 

107 self.disconnect() 

108 

109 return response 

110 

111 def native_query(self, query: str) -> Response: 

112 """ 

113 Executes a SQL query and returns the result. 

114 

115 Args: 

116 query (str): The SQL query to be executed. 

117 

118 Returns: 

119 Response: A response object containing the result of the query or an error message. 

120 """ 

121 

122 connection = self.connect() 

123 cur = connection.cursor() 

124 try: 

125 cur.execute(query) 

126 result = cur.fetchall() 

127 if result: 127 ↛ 130line 127 didn't jump to line 130 because the condition on line 127 was always true

128 response = Response(RESPONSE_TYPE.TABLE, pd.DataFrame(result, columns=[x[0] for x in cur.description])) 

129 else: 

130 response = Response(RESPONSE_TYPE.OK) 

131 connection.commit() 

132 except SQLAlchemyError as e: 

133 logger.error(f"Error running query: {query} on {self.connection_data['database']}!") 

134 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e)) 

135 connection.rollback() 

136 finally: 

137 cur.close() 

138 

139 return response 

140 

141 def query(self, query: ASTNode) -> Response: 

142 """ 

143 Retrieve the data from the SQL statement with eliminated rows that dont satisfy the WHERE condition 

144 """ 

145 query_str = self.renderer.get_string(query, with_failback=True) 

146 return self.native_query(query_str) 

147 

148 def get_tables(self) -> Response: 

149 """ 

150 Get a list with all of the tabels in ClickHouse db 

151 """ 

152 q = f"SHOW TABLES FROM {self.connection_data['database']}" 

153 result = self.native_query(q) 

154 df = result.data_frame 

155 

156 if df is not None: 156 ↛ 159line 156 didn't jump to line 159 because the condition on line 156 was always true

157 result.data_frame = df.rename(columns={df.columns[0]: "table_name"}) 

158 

159 return result 

160 

161 def get_columns(self, table_name) -> Response: 

162 """ 

163 Show details about the table 

164 """ 

165 q = f"DESCRIBE {table_name}" 

166 result = self.native_query(q) 

167 return result