Coverage for mindsdb / integrations / handlers / azure_blob_handler / azure_blob_handler.py: 0%
153 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-21 00:36 +0000
1from mindsdb.integrations.libs.response import (
2 HandlerStatusResponse as StatusResponse,
3 HandlerResponse as Response,
4 RESPONSE_TYPE
5)
6from mindsdb.utilities import log
7import duckdb
8import pandas as pd
10from azure.storage.blob import BlobServiceClient
12from contextlib import contextmanager
13from typing import List, Text, Optional, Dict
14from mindsdb.integrations.libs.api_handler import APIResource, APIHandler
15from mindsdb.integrations.utilities.sql_utils import FilterCondition
16from mindsdb_sql_parser.ast.base import ASTNode
17from mindsdb_sql_parser.ast import Select, Identifier, Insert
18from mindsdb_sql_parser import parse_sql
20logger = log.getLogger(__name__)
23class ListFilesTable(APIResource):
25 def list(self,
26 targets: List[str] = None,
27 conditions: List[FilterCondition] = None,
28 *args, **kwargs) -> pd.DataFrame:
30 tables = self.handler.get_files()
31 data = []
32 for path in tables:
33 path = path.replace('`', '')
34 item = {
35 'path': path,
36 'name': path[path.rfind('/') + 1:],
37 'extension': path[path.rfind('.') + 1:]
38 }
40 data.append(item)
42 return pd.DataFrame(data=data, columns=self.get_columns())
44 def get_columns(self) -> List[str]:
45 return ["path", "name", "extension", "content"]
48class FileTable(APIResource):
50 def list(self, targets: List[str] = None, table_name=None, *args, **kwargs) -> pd.DataFrame:
51 return self.handler.read_as_table(table_name)
53 def add(self, data, table_name=None):
54 df = pd.DataFrame(data)
55 return self.handler.add_data_to_table(table_name, df)
58class AzureBlobHandler(APIHandler):
59 """
60 This handler handles connection and execution of the SQL statements on Azure Blob.
61 """
63 name = "azureblob"
64 supported_file_formats = ['csv', 'tsv', 'json', 'parquet']
66 def __init__(self, name: Text, connection_data: Optional[Dict], **kwargs):
67 super().__init__(name)
68 """ constructor
69 Args:
70 name (str): the handler name
71 """
73 self.connection = None
74 self.is_connected = False
75 self._tables = {}
76 self._files_table = ListFilesTable(self)
77 self.container_name = None
79 self.connection_data = connection_data
81 if 'container_name' in connection_data:
82 self.container_name = connection_data['container_name']
84 if 'connection_string' in connection_data:
85 self.connection_string = connection_data['connection_string']
87 def connect(self) -> BlobServiceClient:
88 """ Set up any connections required by the handler
89 Should return output of check_connection() method after attempting
90 connection. Should switch self.is_connected.
91 Returns:
92 HandlerStatusResponse
93 """
94 if self.is_connected is True:
95 return self.connection
97 blob_service_client = BlobServiceClient.from_connection_string(conn_str=self.connection_string)
99 self.connection = blob_service_client
100 self.is_connected = True
101 return blob_service_client
103 def check_connection(self) -> StatusResponse:
104 """ Check connection to the handler
105 Returns:
106 HandlerStatusResponse
107 """
108 response = StatusResponse(False)
109 need_to_close = self.is_connected is False
111 try:
112 client = self.connect()
113 client.get_account_information()
114 response.success = True
116 except Exception as e:
117 logger.error(f'Error connecting to Azure Blob: {e}!')
118 response.error_message = e
120 if response.success and need_to_close:
121 self.disconnect()
123 elif not response.success and self.is_connected:
124 self.is_connected = False
126 return response
128 def disconnect(self):
129 """
130 Closes the connection to the Azure Blob account if it's currently open.
131 """
132 if not self.is_connected:
133 return
134 self.connection.close()
135 self.is_connected = False
137 @contextmanager
138 def _connect_duckdb(self):
139 """
140 Creates temporal duckdb database which is able to connect to the Azure Blob account.
141 Have to be used as context manager
143 Returns:
144 DuckDBPyConnection
145 """
146 # Connect to Azure Blob via DuckDB.
147 duckdb_conn = duckdb.connect(":memory:")
148 duckdb_conn.execute('INSTALL azure')
149 duckdb_conn.execute('LOAD azure')
151 # Configure mandatory credentials.
152 duckdb_conn.execute(f'SET azure_storage_connection_string="{self.connection_string}"')
154 try:
155 yield duckdb_conn
156 finally:
157 duckdb_conn.close()
159 def read_as_table(self, key) -> pd.DataFrame:
160 """
161 Read object as dataframe. Uses duckdb
162 """
164 with self._connect_duckdb() as connection:
165 cursor = connection.execute(f'SELECT * FROM "azure://{self.container_name}/{key}"')
166 return cursor.fetchdf()
168 def _read_as_content(self, key) -> None:
169 """
170 Read object as content
171 """
173 connection = self.connect()
174 client = connection.get_blob_client(container=self.container_name, blob=key)
176 return client.download_blob()
178 def add_data_to_table(self, key, df) -> None:
179 pass
180 """
181 Writes the table to a file in the azure container.
183 Raises:
184 CatalogException: If the table does not exist in the DuckDB connection.
185 """
187 # Check if the file exists in the Container.
189 try:
190 client = self.connect()
191 blob_client = client.get_blob_client(container=self.container_name, blob=key)
192 blob_client.close()
194 except Exception as e:
195 logger.error(f'Error querying the file {key} in the container {self.container_name}, {e}!')
196 raise e
198 with self._connect_duckdb() as connection:
199 # copy
200 connection.execute(f'CREATE TABLE tmp_table AS SELECT * FROM "azure://{self.container_name}/{key}"')
202 # insert
203 connection.execute("INSERT INTO tmp_table BY NAME SELECT * FROM df")
205 # upload
206 connection.execute(f"COPY tmp_table TO 'azure://{self.container_name}/{key}'")
208 def native_query(self, query: str) -> Response:
209 """
210 Executes a SQL query and returns the result.
212 Args:
213 query (str): The SQL query to be executed.
215 Returns:
216 Response: A response object containing the result of the query or an error message.
217 """
218 query_ast = parse_sql(query)
219 return self.query(query_ast)
221 def query(self, query: ASTNode) -> Response:
222 """
223 Executes a SQL query represented by an ASTNode and retrieves the data.
225 Args:
226 query (ASTNode): An ASTNode representing the SQL query to be executed.
228 Raises:
229 ValueError: If the file format is not supported or the file does not exist in the Azure Blob Container.
231 Returns:
232 Response: The response from the `native_query` method, containing the result of the SQL query execution.
233 """
235 self.connect()
237 if isinstance(query, Select):
238 table_name = query.from_table.parts[-1].replace('`', '')
240 if table_name == 'files':
241 table = self._files_table
242 df = table.select(query)
244 # add content
245 has_content = False
246 for target in query.targets:
247 if isinstance(target, Identifier) and target.parts[-1].lower() == 'content':
248 has_content = True
249 break
250 if has_content:
251 df['content'] = df['path'].apply(self._read_as_content)
252 else:
253 extension = table_name.split('.')[-1]
254 if extension not in self.supported_file_formats:
255 logger.error(f'The file format {extension} is not supported!')
256 raise ValueError(f'The file format {extension} is not supported!')
258 table = FileTable(self, table_name=table_name)
259 df = table.select(query)
261 response = Response(
262 RESPONSE_TYPE.TABLE,
263 data_frame=df
264 )
265 elif isinstance(query, Insert):
266 table_name = query.table.parts[-1]
267 table = FileTable(self, table_name=table_name)
268 table.insert(query)
269 response = Response(RESPONSE_TYPE.OK)
270 else:
271 raise NotImplementedError
273 return response
275 def get_files(self) -> List[str]:
276 client = self.connect()
277 container_client = client.get_container_client(self.container_name)
278 all_files = container_client.list_blobs()
280 # Wrap the object names with backticks to prevent SQL syntax errors.
281 supported_files = [
282 f"`{file.get('name')}`"
283 for file in all_files if file.get('name').split('.')[-1] in self.supported_file_formats
284 ]
286 return supported_files
288 def get_tables(self) -> Response:
289 """
290 Retrieves a list of tables (objects) in the Azure Containers.
292 Each object is considered a table. Only the supported file formats are considered as tables.
294 Returns:
295 Response: A response object containing the list of tables and views, formatted as per the `Response` class.
296 """
298 # Get only the supported file formats.
299 # Wrap the object names with backticks to prevent SQL syntax errors.
301 supported_files = self.get_files()
303 # virtual table with list of files
304 supported_files.insert(0, 'files')
306 response = Response(
307 RESPONSE_TYPE.TABLE,
308 data_frame=pd.DataFrame(
309 supported_files,
310 columns=['table_name']
311 )
312 )
314 return response
316 def get_columns(self, table_name: str) -> Response:
317 """
318 Retrieves column details for a specified table (object) in the Azure Blob Container.
320 Args:
321 table_name (Text): The name of the table for which to retrieve column information.
323 Raises:
324 ValueError: If the 'table_name' is not a valid string.
326 Returns:
327 Response: A response object containing the column details, formatted as per the `Response` class.
328 """
329 if not table_name or not isinstance(table_name, str):
330 raise ValueError("Invalid table name provided.")
332 query = f"SELECT * FROM {table_name} LIMIT 5"
334 result = self.query(query)
336 response = Response(
337 RESPONSE_TYPE.TABLE,
338 data_frame=pd.DataFrame(
339 {
340 'column_name': result.data_frame.columns,
341 'data_type': [data_type if data_type != 'object' else 'string' for data_type in result.data_frame.dtypes]
342 }
343 )
344 )
346 return response