Coverage for mindsdb / integrations / handlers / dremio_handler / dremio_handler.py: 0%

91 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from typing import Optional 

2 

3import json 

4import time 

5import requests 

6import pandas as pd 

7 

8from mindsdb_sql_parser import parse_sql 

9from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

10from mindsdb.integrations.libs.base import DatabaseHandler 

11from sqlalchemy_dremio.base import DremioDialect 

12 

13from mindsdb_sql_parser.ast.base import ASTNode 

14 

15from mindsdb.utilities import log 

16from mindsdb.integrations.libs.response import ( 

17 HandlerStatusResponse as StatusResponse, 

18 HandlerResponse as Response, 

19 RESPONSE_TYPE 

20) 

21 

22logger = log.getLogger(__name__) 

23 

24 

25class DremioHandler(DatabaseHandler): 

26 """ 

27 This handler handles connection and execution of the Dremio statements. 

28 """ 

29 

30 name = 'dremio' 

31 

32 def __init__(self, name: str, connection_data: Optional[dict], **kwargs): 

33 """ 

34 Initialize the handler. 

35 Args: 

36 name (str): name of particular handler instance 

37 connection_data (dict): parameters for connecting to the database 

38 **kwargs: arbitrary keyword arguments. 

39 """ 

40 super().__init__(name) 

41 self.parser = parse_sql 

42 self.dialect = 'dremio' 

43 

44 self.connection_data = connection_data 

45 self.kwargs = kwargs 

46 

47 self.base_url = f"http://{self.connection_data['host']}:{self.connection_data['port']}" 

48 

49 self.connection = None 

50 self.is_connected = False 

51 

52 def __del__(self): 

53 if self.is_connected is True: 

54 self.disconnect() 

55 

56 def connect(self) -> dict: 

57 """ 

58 Set up the connection required by the handler. 

59 Returns: 

60 HandlerStatusResponse 

61 """ 

62 

63 headers = { 

64 'Content-Type': 'application/json', 

65 } 

66 

67 data = '{' + f'"userName": "{self.connection_data["username"]}","password": "{self.connection_data["password"]}"' + '}' 

68 

69 response = requests.post(self.base_url + '/apiv2/login', headers=headers, data=data) 

70 

71 return { 

72 'Authorization': '_dremio' + response.json()['token'], 

73 'Content-Type': 'application/json', 

74 } 

75 

76 def disconnect(self): 

77 """ 

78 Close any existing connections. 

79 """ 

80 

81 self.is_connected = False 

82 return 

83 

84 def check_connection(self) -> StatusResponse: 

85 """ 

86 Check connection to the handler. 

87 Returns: 

88 HandlerStatusResponse 

89 """ 

90 

91 response = StatusResponse(False) 

92 need_to_close = self.is_connected is False 

93 

94 try: 

95 self.connect() 

96 response.success = True 

97 except Exception as e: 

98 logger.error(f'Error connecting to Dremio, {e}!') 

99 response.error_message = str(e) 

100 finally: 

101 if response.success is True and need_to_close: 

102 self.disconnect() 

103 if response.success is False and self.is_connected is True: 

104 self.is_connected = False 

105 

106 return response 

107 

108 def native_query(self, query: str) -> StatusResponse: 

109 """ 

110 Receive raw query and act upon it somehow. 

111 Args: 

112 query (str): query in native format 

113 Returns: 

114 HandlerResponse 

115 """ 

116 

117 query = query.replace('"', '\\"').replace('\n', ' ') 

118 

119 need_to_close = self.is_connected is False 

120 

121 auth_headers = self.connect() 

122 data = '{' + f'"sql": "{query}"' + '}' 

123 

124 try: 

125 sql_result = requests.post(self.base_url + '/api/v3/sql', headers=auth_headers, data=data) 

126 

127 job_id = sql_result.json()['id'] 

128 

129 if sql_result.status_code == 200: 

130 logger.info('Job creation successful. Job id is: ' + job_id) 

131 else: 

132 logger.info('Job creation failed.') 

133 

134 logger.info('Waiting for the job to complete...') 

135 

136 job_status = requests.request("GET", self.base_url + "/api/v3/job/" + job_id, headers=auth_headers).json()[ 

137 'jobState'] 

138 

139 while job_status != 'COMPLETED': 

140 if job_status == 'FAILED': 

141 logger.error('Job failed!') 

142 break 

143 

144 time.sleep(2) 

145 job_status = requests.request("GET", self.base_url + "/api/v3/job/" + job_id, headers=auth_headers).json()[ 

146 'jobState'] 

147 

148 job_result = json.loads(requests.request("GET", self.base_url + "/api/v3/job/" + job_id + "/results", headers=auth_headers).text) 

149 

150 if 'errorMessage' not in job_result: 

151 response = Response( 

152 RESPONSE_TYPE.TABLE, 

153 data_frame=pd.DataFrame( 

154 job_result['rows'] 

155 ) 

156 ) 

157 else: 

158 response = Response( 

159 RESPONSE_TYPE.ERROR, 

160 error_message=str(job_result['errorMessage']) 

161 ) 

162 

163 except Exception as e: 

164 logger.error(f'Error running query: {query} on Dremio!') 

165 response = Response( 

166 RESPONSE_TYPE.ERROR, 

167 error_message=str(e) 

168 ) 

169 

170 if need_to_close is True: 

171 self.disconnect() 

172 

173 return response 

174 

175 def query(self, query: ASTNode) -> StatusResponse: 

176 """ 

177 Receive query as AST (abstract syntax tree) and act upon it somehow. 

178 Args: 

179 query (ASTNode): sql query represented as AST. May be any kind 

180 of query: SELECT, INTSERT, DELETE, etc 

181 Returns: 

182 HandlerResponse 

183 """ 

184 

185 renderer = SqlalchemyRender(DremioDialect) 

186 query_str = renderer.get_string(query, with_failback=True) 

187 return self.native_query(query_str) 

188 

189 def get_tables(self) -> Response: 

190 """ 

191 Return list of entities that will be accessible as tables. 

192 Returns: 

193 HandlerResponse 

194 """ 

195 

196 query = """ 

197 SELECT 

198 TABLE_NAME, 

199 TABLE_SCHEMA, 

200 CASE 

201 WHEN TABLE_TYPE = 'TABLE' THEN 'BASE TABLE' 

202 ELSE TABLE_TYPE 

203 END AS TABLE_TYPE 

204 FROM INFORMATION_SCHEMA."TABLES" 

205 WHERE TABLE_TYPE <> 'SYSTEM_TABLE'; 

206 """ 

207 return self.native_query(query) 

208 

209 def get_columns(self, table_name: str) -> StatusResponse: 

210 """ 

211 Returns a list of entity columns. 

212 Args: 

213 table_name (str): name of one of tables returned by self.get_tables() 

214 Returns: 

215 HandlerResponse 

216 """ 

217 

218 query = f"DESCRIBE {table_name}" 

219 result = self.native_query(query) 

220 df = result.data_frame 

221 result.data_frame = df.rename(columns={'COLUMN_NAME': 'Field', 'DATA_TYPE': 'Type'}) 

222 return result