Coverage for mindsdb / integrations / handlers / dremio_handler / dremio_handler.py: 0%
91 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from typing import Optional
3import json
4import time
5import requests
6import pandas as pd
8from mindsdb_sql_parser import parse_sql
9from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender
10from mindsdb.integrations.libs.base import DatabaseHandler
11from sqlalchemy_dremio.base import DremioDialect
13from mindsdb_sql_parser.ast.base import ASTNode
15from mindsdb.utilities import log
16from mindsdb.integrations.libs.response import (
17 HandlerStatusResponse as StatusResponse,
18 HandlerResponse as Response,
19 RESPONSE_TYPE
20)
22logger = log.getLogger(__name__)
25class DremioHandler(DatabaseHandler):
26 """
27 This handler handles connection and execution of the Dremio statements.
28 """
30 name = 'dremio'
32 def __init__(self, name: str, connection_data: Optional[dict], **kwargs):
33 """
34 Initialize the handler.
35 Args:
36 name (str): name of particular handler instance
37 connection_data (dict): parameters for connecting to the database
38 **kwargs: arbitrary keyword arguments.
39 """
40 super().__init__(name)
41 self.parser = parse_sql
42 self.dialect = 'dremio'
44 self.connection_data = connection_data
45 self.kwargs = kwargs
47 self.base_url = f"http://{self.connection_data['host']}:{self.connection_data['port']}"
49 self.connection = None
50 self.is_connected = False
52 def __del__(self):
53 if self.is_connected is True:
54 self.disconnect()
56 def connect(self) -> dict:
57 """
58 Set up the connection required by the handler.
59 Returns:
60 HandlerStatusResponse
61 """
63 headers = {
64 'Content-Type': 'application/json',
65 }
67 data = '{' + f'"userName": "{self.connection_data["username"]}","password": "{self.connection_data["password"]}"' + '}'
69 response = requests.post(self.base_url + '/apiv2/login', headers=headers, data=data)
71 return {
72 'Authorization': '_dremio' + response.json()['token'],
73 'Content-Type': 'application/json',
74 }
76 def disconnect(self):
77 """
78 Close any existing connections.
79 """
81 self.is_connected = False
82 return
84 def check_connection(self) -> StatusResponse:
85 """
86 Check connection to the handler.
87 Returns:
88 HandlerStatusResponse
89 """
91 response = StatusResponse(False)
92 need_to_close = self.is_connected is False
94 try:
95 self.connect()
96 response.success = True
97 except Exception as e:
98 logger.error(f'Error connecting to Dremio, {e}!')
99 response.error_message = str(e)
100 finally:
101 if response.success is True and need_to_close:
102 self.disconnect()
103 if response.success is False and self.is_connected is True:
104 self.is_connected = False
106 return response
108 def native_query(self, query: str) -> StatusResponse:
109 """
110 Receive raw query and act upon it somehow.
111 Args:
112 query (str): query in native format
113 Returns:
114 HandlerResponse
115 """
117 query = query.replace('"', '\\"').replace('\n', ' ')
119 need_to_close = self.is_connected is False
121 auth_headers = self.connect()
122 data = '{' + f'"sql": "{query}"' + '}'
124 try:
125 sql_result = requests.post(self.base_url + '/api/v3/sql', headers=auth_headers, data=data)
127 job_id = sql_result.json()['id']
129 if sql_result.status_code == 200:
130 logger.info('Job creation successful. Job id is: ' + job_id)
131 else:
132 logger.info('Job creation failed.')
134 logger.info('Waiting for the job to complete...')
136 job_status = requests.request("GET", self.base_url + "/api/v3/job/" + job_id, headers=auth_headers).json()[
137 'jobState']
139 while job_status != 'COMPLETED':
140 if job_status == 'FAILED':
141 logger.error('Job failed!')
142 break
144 time.sleep(2)
145 job_status = requests.request("GET", self.base_url + "/api/v3/job/" + job_id, headers=auth_headers).json()[
146 'jobState']
148 job_result = json.loads(requests.request("GET", self.base_url + "/api/v3/job/" + job_id + "/results", headers=auth_headers).text)
150 if 'errorMessage' not in job_result:
151 response = Response(
152 RESPONSE_TYPE.TABLE,
153 data_frame=pd.DataFrame(
154 job_result['rows']
155 )
156 )
157 else:
158 response = Response(
159 RESPONSE_TYPE.ERROR,
160 error_message=str(job_result['errorMessage'])
161 )
163 except Exception as e:
164 logger.error(f'Error running query: {query} on Dremio!')
165 response = Response(
166 RESPONSE_TYPE.ERROR,
167 error_message=str(e)
168 )
170 if need_to_close is True:
171 self.disconnect()
173 return response
175 def query(self, query: ASTNode) -> StatusResponse:
176 """
177 Receive query as AST (abstract syntax tree) and act upon it somehow.
178 Args:
179 query (ASTNode): sql query represented as AST. May be any kind
180 of query: SELECT, INTSERT, DELETE, etc
181 Returns:
182 HandlerResponse
183 """
185 renderer = SqlalchemyRender(DremioDialect)
186 query_str = renderer.get_string(query, with_failback=True)
187 return self.native_query(query_str)
189 def get_tables(self) -> Response:
190 """
191 Return list of entities that will be accessible as tables.
192 Returns:
193 HandlerResponse
194 """
196 query = """
197 SELECT
198 TABLE_NAME,
199 TABLE_SCHEMA,
200 CASE
201 WHEN TABLE_TYPE = 'TABLE' THEN 'BASE TABLE'
202 ELSE TABLE_TYPE
203 END AS TABLE_TYPE
204 FROM INFORMATION_SCHEMA."TABLES"
205 WHERE TABLE_TYPE <> 'SYSTEM_TABLE';
206 """
207 return self.native_query(query)
209 def get_columns(self, table_name: str) -> StatusResponse:
210 """
211 Returns a list of entity columns.
212 Args:
213 table_name (str): name of one of tables returned by self.get_tables()
214 Returns:
215 HandlerResponse
216 """
218 query = f"DESCRIBE {table_name}"
219 result = self.native_query(query)
220 df = result.data_frame
221 result.data_frame = df.rename(columns={'COLUMN_NAME': 'Field', 'DATA_TYPE': 'Type'})
222 return result