Coverage for mindsdb / integrations / handlers / druid_handler / druid_handler.py: 0%
91 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
3import pandas as pd
4from pydruid.db import connect
6from mindsdb_sql_parser import parse_sql
7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
8from mindsdb.integrations.libs.base import DatabaseHandler
9from pydruid.db.sqlalchemy import DruidDialect
11from mindsdb_sql_parser import ASTNode
13from mindsdb.utilities import log
14from mindsdb.integrations.libs.response import (
15 HandlerStatusResponse as StatusResponse,
16 HandlerResponse as Response,
17 RESPONSE_TYPE,
18)
20logger = log.getLogger(__name__)
23class DruidHandler(DatabaseHandler):
24 """
25 This handler handles connection and execution of the Apache Druid statements.
26 """
28 name = "druid"
30 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
31 """
32 Initialize the handler.
33 Args:
34 name (str): name of particular handler instance
35 connection_data (dict): parameters for connecting to the database
36 **kwargs: arbitrary keyword arguments.
37 """
38 super().__init__(name)
39 self.parser = parse_sql
40 self.dialect = "druid"
42 optional_parameters = ["user", "password"]
43 for parameter in optional_parameters:
44 if parameter not in connection_data:
45 connection_data[parameter] = None
47 if "path" not in connection_data:
48 connection_data["path"] = "/druid/v2/sql/"
50 if "scheme" not in connection_data:
51 connection_data["scheme"] = "http"
53 self.connection_data = connection_data
54 self.kwargs = kwargs
56 self.connection = None
57 self.is_connected = False
59 def __del__(self):
60 if self.is_connected is True:
61 self.disconnect()
63 def connect(self) -> StatusResponse:
64 """
65 Set up the connection required by the handler.
66 Returns:
67 HandlerStatusResponse
68 """
70 if self.is_connected is True:
71 return self.connection
73 self.connection = connect(
74 host=self.connection_data["host"],
75 port=self.connection_data["port"],
76 path=self.connection_data["path"],
77 scheme=self.connection_data["scheme"],
78 user=self.connection_data["user"],
79 password=self.connection_data["password"],
80 )
81 self.is_connected = True
83 return self.connection
85 def disconnect(self):
86 """
87 Close any existing connections.
88 """
90 if self.is_connected is False:
91 return
93 self.connection.close()
94 self.is_connected = False
95 return self.is_connected
97 def check_connection(self) -> StatusResponse:
98 """
99 Check connection to the handler.
100 Returns:
101 HandlerStatusResponse
102 """
104 response = StatusResponse(False)
105 need_to_close = self.is_connected is False
107 try:
108 conn = self.connect()
109 conn.cursor().execute("select 1") # raise exception if provided wrong credentials
111 response.success = True
112 except Exception as e:
113 logger.error(f"Error connecting to Druid, {e}!")
114 response.error_message = str(e)
115 finally:
116 if response.success is True and need_to_close:
117 self.disconnect()
118 if response.success is False and self.is_connected is True:
119 self.is_connected = False
121 return response
123 def native_query(self, query: str) -> StatusResponse:
124 """
125 Receive raw query and act upon it somehow.
126 Args:
127 query (str): query in native format
128 Returns:
129 HandlerResponse
130 """
132 need_to_close = self.is_connected is False
134 connection = self.connect()
135 cursor = connection.cursor()
137 try:
138 cursor.execute(query)
139 result = cursor.fetchall()
140 if result:
141 response = Response(
142 RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(result, columns=[x[0] for x in cursor.description])
143 )
144 else:
145 connection.commit()
146 response = Response(RESPONSE_TYPE.OK)
147 except Exception as e:
148 logger.error(f"Error running query: {query} on Pinot!")
149 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e))
151 cursor.close()
152 if need_to_close is True:
153 self.disconnect()
155 return response
157 def query(self, query: ASTNode) -> StatusResponse:
158 """
159 Receive query as AST (abstract syntax tree) and act upon it somehow.
160 Args:
161 query (ASTNode): sql query represented as AST. May be any kind
162 of query: SELECT, INTSERT, DELETE, etc
163 Returns:
164 HandlerResponse
165 """
166 renderer = SqlalchemyRender(DruidDialect)
167 query_str = renderer.get_string(query, with_failback=True)
168 return self.native_query(query_str)
170 def get_tables(self) -> StatusResponse:
171 """
172 Return list of entities that will be accessible as tables.
173 Returns:
174 HandlerResponse
175 """
177 query = """
178 SELECT
179 TABLE_SCHEMA AS table_schema,
180 TABLE_NAME AS table_name,
181 TABLE_TYPE AS table_type
182 FROM INFORMATION_SCHEMA.TABLES
183 WHERE TABLE_SCHEMA not in ('INFORMATION_SCHEMA', 'sys')
184 """
185 result = self.native_query(query)
187 return result
189 def get_columns(self, table_name: str, schema_name: Optional[str] = None) -> StatusResponse:
190 """
191 Returns a list of entity columns.
192 Args:
193 table_name (str): name of one of tables returned by self.get_tables()
194 Returns:
195 HandlerResponse
196 """
197 if schema_name is None:
198 schema_name = "druid"
199 query = f"""
200 SELECT
201 COLUMN_NAME FIELD,
202 DATA_TYPE TYPE
203 FROM INFORMATION_SCHEMA.COLUMNS
204 WHERE "TABLE_SCHEMA" = '{schema_name}' AND "TABLE_NAME" = '{table_name}'
205 """
206 result = self.native_query(query)
208 return result