Coverage for mindsdb / integrations / handlers / druid_handler / druid_handler.py: 0%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2 

3import pandas as pd 

4from pydruid.db import connect 

5 

6from mindsdb_sql_parser import parse_sql 

7from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

8from mindsdb.integrations.libs.base import DatabaseHandler 

9from pydruid.db.sqlalchemy import DruidDialect 

10 

11from mindsdb_sql_parser import ASTNode 

12 

13from mindsdb.utilities import log 

14from mindsdb.integrations.libs.response import ( 

15 HandlerStatusResponse as StatusResponse, 

16 HandlerResponse as Response, 

17 RESPONSE_TYPE, 

18) 

19 

20logger = log.getLogger(__name__) 

21 

22 

23class DruidHandler(DatabaseHandler): 

24 """ 

25 This handler handles connection and execution of the Apache Druid statements. 

26 """ 

27 

28 name = "druid" 

29 

30 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

31 """ 

32 Initialize the handler. 

33 Args: 

34 name (str): name of particular handler instance 

35 connection_data (dict): parameters for connecting to the database 

36 **kwargs: arbitrary keyword arguments. 

37 """ 

38 super().__init__(name) 

39 self.parser = parse_sql 

40 self.dialect = "druid" 

41 

42 optional_parameters = ["user", "password"] 

43 for parameter in optional_parameters: 

44 if parameter not in connection_data: 

45 connection_data[parameter] = None 

46 

47 if "path" not in connection_data: 

48 connection_data["path"] = "/druid/v2/sql/" 

49 

50 if "scheme" not in connection_data: 

51 connection_data["scheme"] = "http" 

52 

53 self.connection_data = connection_data 

54 self.kwargs = kwargs 

55 

56 self.connection = None 

57 self.is_connected = False 

58 

59 def __del__(self): 

60 if self.is_connected is True: 

61 self.disconnect() 

62 

63 def connect(self) -> StatusResponse: 

64 """ 

65 Set up the connection required by the handler. 

66 Returns: 

67 HandlerStatusResponse 

68 """ 

69 

70 if self.is_connected is True: 

71 return self.connection 

72 

73 self.connection = connect( 

74 host=self.connection_data["host"], 

75 port=self.connection_data["port"], 

76 path=self.connection_data["path"], 

77 scheme=self.connection_data["scheme"], 

78 user=self.connection_data["user"], 

79 password=self.connection_data["password"], 

80 ) 

81 self.is_connected = True 

82 

83 return self.connection 

84 

85 def disconnect(self): 

86 """ 

87 Close any existing connections. 

88 """ 

89 

90 if self.is_connected is False: 

91 return 

92 

93 self.connection.close() 

94 self.is_connected = False 

95 return self.is_connected 

96 

97 def check_connection(self) -> StatusResponse: 

98 """ 

99 Check connection to the handler. 

100 Returns: 

101 HandlerStatusResponse 

102 """ 

103 

104 response = StatusResponse(False) 

105 need_to_close = self.is_connected is False 

106 

107 try: 

108 conn = self.connect() 

109 conn.cursor().execute("select 1") # raise exception if provided wrong credentials 

110 

111 response.success = True 

112 except Exception as e: 

113 logger.error(f"Error connecting to Druid, {e}!") 

114 response.error_message = str(e) 

115 finally: 

116 if response.success is True and need_to_close: 

117 self.disconnect() 

118 if response.success is False and self.is_connected is True: 

119 self.is_connected = False 

120 

121 return response 

122 

123 def native_query(self, query: str) -> StatusResponse: 

124 """ 

125 Receive raw query and act upon it somehow. 

126 Args: 

127 query (str): query in native format 

128 Returns: 

129 HandlerResponse 

130 """ 

131 

132 need_to_close = self.is_connected is False 

133 

134 connection = self.connect() 

135 cursor = connection.cursor() 

136 

137 try: 

138 cursor.execute(query) 

139 result = cursor.fetchall() 

140 if result: 

141 response = Response( 

142 RESPONSE_TYPE.TABLE, data_frame=pd.DataFrame(result, columns=[x[0] for x in cursor.description]) 

143 ) 

144 else: 

145 connection.commit() 

146 response = Response(RESPONSE_TYPE.OK) 

147 except Exception as e: 

148 logger.error(f"Error running query: {query} on Pinot!") 

149 response = Response(RESPONSE_TYPE.ERROR, error_message=str(e)) 

150 

151 cursor.close() 

152 if need_to_close is True: 

153 self.disconnect() 

154 

155 return response 

156 

157 def query(self, query: ASTNode) -> StatusResponse: 

158 """ 

159 Receive query as AST (abstract syntax tree) and act upon it somehow. 

160 Args: 

161 query (ASTNode): sql query represented as AST. May be any kind 

162 of query: SELECT, INTSERT, DELETE, etc 

163 Returns: 

164 HandlerResponse 

165 """ 

166 renderer = SqlalchemyRender(DruidDialect) 

167 query_str = renderer.get_string(query, with_failback=True) 

168 return self.native_query(query_str) 

169 

170 def get_tables(self) -> StatusResponse: 

171 """ 

172 Return list of entities that will be accessible as tables. 

173 Returns: 

174 HandlerResponse 

175 """ 

176 

177 query = """ 

178 SELECT  

179 TABLE_SCHEMA AS table_schema, 

180 TABLE_NAME AS table_name, 

181 TABLE_TYPE AS table_type 

182 FROM INFORMATION_SCHEMA.TABLES 

183 WHERE TABLE_SCHEMA not in ('INFORMATION_SCHEMA', 'sys') 

184 """ 

185 result = self.native_query(query) 

186 

187 return result 

188 

189 def get_columns(self, table_name: str, schema_name: Optional[str] = None) -> StatusResponse: 

190 """ 

191 Returns a list of entity columns. 

192 Args: 

193 table_name (str): name of one of tables returned by self.get_tables() 

194 Returns: 

195 HandlerResponse 

196 """ 

197 if schema_name is None: 

198 schema_name = "druid" 

199 query = f""" 

200 SELECT 

201 COLUMN_NAME FIELD, 

202 DATA_TYPE TYPE 

203 FROM INFORMATION_SCHEMA.COLUMNS 

204 WHERE "TABLE_SCHEMA" = '{schema_name}' AND "TABLE_NAME" = '{table_name}' 

205 """ 

206 result = self.native_query(query) 

207 

208 return result