Coverage for mindsdb / integrations / handlers / clickhouse_handler / clickhouse_handler.py: 85%
98 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from urllib.parse import quote, urlencode
3import pandas as pd
4from sqlalchemy import create_engine
5from sqlalchemy.exc import SQLAlchemyError
6from clickhouse_sqlalchemy.drivers.base import ClickHouseDialect
7from mindsdb_sql_parser.ast.base import ASTNode
8from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
10from mindsdb.utilities import log
11from mindsdb.integrations.libs.base import DatabaseHandler
12from mindsdb.integrations.libs.response import (
13 HandlerStatusResponse as StatusResponse,
14 HandlerResponse as Response,
15 RESPONSE_TYPE,
16)
18logger = log.getLogger(__name__)
21class ClickHouseHandler(DatabaseHandler):
22 """
23 This handler handles connection and execution of the ClickHouse statements.
24 """
26 name = "clickhouse"
28 def __init__(self, name, connection_data, **kwargs):
29 super().__init__(name)
30 self.dialect = "clickhouse"
31 self.connection_data = connection_data
32 self.renderer = SqlalchemyRender(ClickHouseDialect)
33 self.is_connected = False
34 self.protocol = connection_data.get("protocol", "native")
36 def __del__(self):
37 if self.is_connected is True:
38 self.disconnect()
40 def connect(self):
41 """
42 Establishes a connection to a ClickHouse server using SQLAlchemy.
44 Raises:
45 SQLAlchemyError: If an error occurs while connecting to the database.
47 Returns:
48 Connection: A SQLAlchemy Connection object to the ClickHouse database.
49 """
50 if self.is_connected: 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true
51 return self.connection
53 protocol = "clickhouse+native" if self.protocol == "native" else "clickhouse+http"
54 host = quote(self.connection_data["host"])
55 port = self.connection_data["port"]
56 user = quote(self.connection_data["user"])
57 password = quote(self.connection_data["password"])
58 database = quote(self.connection_data["database"])
59 verify = self.connection_data.get("verify", True)
60 url = f"{protocol}://{user}:{password}@{host}:{port}/{database}"
61 # This is not redundunt. Check https://clickhouse-sqlalchemy.readthedocs.io/en/latest/connection.html#http
63 params = {}
64 if self.protocol == "https": 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true
65 params["protocol"] = "https"
66 if verify is False: 66 ↛ 67line 66 didn't jump to line 67 because the condition on line 66 was never true
67 params["verify"] = "false"
68 if params: 68 ↛ 69line 68 didn't jump to line 69 because the condition on line 68 was never true
69 url = f"{url}?{urlencode(params)}"
71 try:
72 engine = create_engine(url)
73 connection = engine.raw_connection()
74 self.is_connected = True
75 self.connection = connection
76 except SQLAlchemyError as e:
77 logger.error(f"Error connecting to ClickHouse {self.connection_data['database']}, {e}!")
78 self.is_connected = False
79 raise
81 return self.connection
83 def check_connection(self) -> StatusResponse:
84 """
85 Checks the status of the connection to the ClickHouse.
87 Returns:
88 StatusResponse: An object containing the success status and an error message if an error occurs.
89 """
90 response = StatusResponse(False)
91 need_to_close = not self.is_connected
93 try:
94 connection = self.connect()
95 cur = connection.cursor()
96 try:
97 cur.execute("select 1;")
98 finally:
99 cur.close()
100 response.success = True
101 except SQLAlchemyError as e:
102 logger.error(f"Error connecting to ClickHouse {self.connection_data['database']}, {e}!")
103 response.error_message = str(e)
104 self.is_connected = False
106 if response.success is True and need_to_close:
107 self.disconnect()
109 return response
111 def native_query(self, query: str) -> Response:
112 """
113 Executes a SQL query and returns the result.
115 Args:
116 query (str): The SQL query to be executed.
118 Returns:
119 Response: A response object containing the result of the query or an error message.
120 """
122 connection = self.connect()
123 cur = connection.cursor()
124 try:
125 cur.execute(query)
126 result = cur.fetchall()
127 if result: 127 ↛ 130line 127 didn't jump to line 130 because the condition on line 127 was always true
128 response = Response(RESPONSE_TYPE.TABLE, pd.DataFrame(result, columns=[x[0] for x in cur.description]))
129 else:
130 response = Response(RESPONSE_TYPE.OK)
131 connection.commit()
132 except SQLAlchemyError as e:
133 logger.error(f"Error running query: {query} on {self.connection_data['database']}!")
134 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e))
135 connection.rollback()
136 finally:
137 cur.close()
139 return response
141 def query(self, query: ASTNode) -> Response:
142 """
143 Retrieve the data from the SQL statement with eliminated rows that dont satisfy the WHERE condition
144 """
145 query_str = self.renderer.get_string(query, with_failback=True)
146 return self.native_query(query_str)
148 def get_tables(self) -> Response:
149 """
150 Get a list with all of the tabels in ClickHouse db
151 """
152 q = f"SHOW TABLES FROM {self.connection_data['database']}"
153 result = self.native_query(q)
154 df = result.data_frame
156 if df is not None: 156 ↛ 159line 156 didn't jump to line 159 because the condition on line 156 was always true
157 result.data_frame = df.rename(columns={df.columns[0]: "table_name"})
159 return result
161 def get_columns(self, table_name) -> Response:
162 """
163 Show details about the table
164 """
165 q = f"DESCRIBE {table_name}"
166 result = self.native_query(q)
167 return result