Coverage for mindsdb / integrations / handlers / azure_blob_handler / azure_blob_handler.py: 0%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1from mindsdb.integrations.libs.response import ( 

2 HandlerStatusResponse as StatusResponse, 

3 HandlerResponse as Response, 

4 RESPONSE_TYPE 

5) 

6from mindsdb.utilities import log 

7import duckdb 

8import pandas as pd 

9 

10from azure.storage.blob import BlobServiceClient 

11 

12from contextlib import contextmanager 

13from typing import List, Text, Optional, Dict 

14from mindsdb.integrations.libs.api_handler import APIResource, APIHandler 

15from mindsdb.integrations.utilities.sql_utils import FilterCondition 

16from mindsdb_sql_parser.ast.base import ASTNode 

17from mindsdb_sql_parser.ast import Select, Identifier, Insert 

18from mindsdb_sql_parser import parse_sql 

19 

20logger = log.getLogger(__name__) 

21 

22 

23class ListFilesTable(APIResource): 

24 

25 def list(self, 

26 targets: List[str] = None, 

27 conditions: List[FilterCondition] = None, 

28 *args, **kwargs) -> pd.DataFrame: 

29 

30 tables = self.handler.get_files() 

31 data = [] 

32 for path in tables: 

33 path = path.replace('`', '') 

34 item = { 

35 'path': path, 

36 'name': path[path.rfind('/') + 1:], 

37 'extension': path[path.rfind('.') + 1:] 

38 } 

39 

40 data.append(item) 

41 

42 return pd.DataFrame(data=data, columns=self.get_columns()) 

43 

44 def get_columns(self) -> List[str]: 

45 return ["path", "name", "extension", "content"] 

46 

47 

48class FileTable(APIResource): 

49 

50 def list(self, targets: List[str] = None, table_name=None, *args, **kwargs) -> pd.DataFrame: 

51 return self.handler.read_as_table(table_name) 

52 

53 def add(self, data, table_name=None): 

54 df = pd.DataFrame(data) 

55 return self.handler.add_data_to_table(table_name, df) 

56 

57 

58class AzureBlobHandler(APIHandler): 

59 """ 

60 This handler handles connection and execution of the SQL statements on Azure Blob. 

61 """ 

62 

63 name = "azureblob" 

64 supported_file_formats = ['csv', 'tsv', 'json', 'parquet'] 

65 

66 def __init__(self, name: Text, connection_data: Optional[Dict], **kwargs): 

67 super().__init__(name) 

68 """ constructor 

69 Args: 

70 name (str): the handler name 

71 """ 

72 

73 self.connection = None 

74 self.is_connected = False 

75 self._tables = {} 

76 self._files_table = ListFilesTable(self) 

77 self.container_name = None 

78 

79 self.connection_data = connection_data 

80 

81 if 'container_name' in connection_data: 

82 self.container_name = connection_data['container_name'] 

83 

84 if 'connection_string' in connection_data: 

85 self.connection_string = connection_data['connection_string'] 

86 

87 def connect(self) -> BlobServiceClient: 

88 """ Set up any connections required by the handler 

89 Should return output of check_connection() method after attempting 

90 connection. Should switch self.is_connected. 

91 Returns: 

92 HandlerStatusResponse 

93 """ 

94 if self.is_connected is True: 

95 return self.connection 

96 

97 blob_service_client = BlobServiceClient.from_connection_string(conn_str=self.connection_string) 

98 

99 self.connection = blob_service_client 

100 self.is_connected = True 

101 return blob_service_client 

102 

103 def check_connection(self) -> StatusResponse: 

104 """ Check connection to the handler 

105 Returns: 

106 HandlerStatusResponse 

107 """ 

108 response = StatusResponse(False) 

109 need_to_close = self.is_connected is False 

110 

111 try: 

112 client = self.connect() 

113 client.get_account_information() 

114 response.success = True 

115 

116 except Exception as e: 

117 logger.error(f'Error connecting to Azure Blob: {e}!') 

118 response.error_message = e 

119 

120 if response.success and need_to_close: 

121 self.disconnect() 

122 

123 elif not response.success and self.is_connected: 

124 self.is_connected = False 

125 

126 return response 

127 

128 def disconnect(self): 

129 """ 

130 Closes the connection to the Azure Blob account if it's currently open. 

131 """ 

132 if not self.is_connected: 

133 return 

134 self.connection.close() 

135 self.is_connected = False 

136 

137 @contextmanager 

138 def _connect_duckdb(self): 

139 """ 

140 Creates temporal duckdb database which is able to connect to the Azure Blob account. 

141 Have to be used as context manager 

142 

143 Returns: 

144 DuckDBPyConnection 

145 """ 

146 # Connect to Azure Blob via DuckDB. 

147 duckdb_conn = duckdb.connect(":memory:") 

148 duckdb_conn.execute('INSTALL azure') 

149 duckdb_conn.execute('LOAD azure') 

150 

151 # Configure mandatory credentials. 

152 duckdb_conn.execute(f'SET azure_storage_connection_string="{self.connection_string}"') 

153 

154 try: 

155 yield duckdb_conn 

156 finally: 

157 duckdb_conn.close() 

158 

159 def read_as_table(self, key) -> pd.DataFrame: 

160 """ 

161 Read object as dataframe. Uses duckdb 

162 """ 

163 

164 with self._connect_duckdb() as connection: 

165 cursor = connection.execute(f'SELECT * FROM "azure://{self.container_name}/{key}"') 

166 return cursor.fetchdf() 

167 

168 def _read_as_content(self, key) -> None: 

169 """ 

170 Read object as content 

171 """ 

172 

173 connection = self.connect() 

174 client = connection.get_blob_client(container=self.container_name, blob=key) 

175 

176 return client.download_blob() 

177 

178 def add_data_to_table(self, key, df) -> None: 

179 pass 

180 """ 

181 Writes the table to a file in the azure container. 

182 

183 Raises: 

184 CatalogException: If the table does not exist in the DuckDB connection. 

185 """ 

186 

187 # Check if the file exists in the Container. 

188 

189 try: 

190 client = self.connect() 

191 blob_client = client.get_blob_client(container=self.container_name, blob=key) 

192 blob_client.close() 

193 

194 except Exception as e: 

195 logger.error(f'Error querying the file {key} in the container {self.container_name}, {e}!') 

196 raise e 

197 

198 with self._connect_duckdb() as connection: 

199 # copy 

200 connection.execute(f'CREATE TABLE tmp_table AS SELECT * FROM "azure://{self.container_name}/{key}"') 

201 

202 # insert 

203 connection.execute("INSERT INTO tmp_table BY NAME SELECT * FROM df") 

204 

205 # upload 

206 connection.execute(f"COPY tmp_table TO 'azure://{self.container_name}/{key}'") 

207 

208 def native_query(self, query: str) -> Response: 

209 """ 

210 Executes a SQL query and returns the result. 

211 

212 Args: 

213 query (str): The SQL query to be executed. 

214 

215 Returns: 

216 Response: A response object containing the result of the query or an error message. 

217 """ 

218 query_ast = parse_sql(query) 

219 return self.query(query_ast) 

220 

221 def query(self, query: ASTNode) -> Response: 

222 """ 

223 Executes a SQL query represented by an ASTNode and retrieves the data. 

224 

225 Args: 

226 query (ASTNode): An ASTNode representing the SQL query to be executed. 

227 

228 Raises: 

229 ValueError: If the file format is not supported or the file does not exist in the Azure Blob Container. 

230 

231 Returns: 

232 Response: The response from the `native_query` method, containing the result of the SQL query execution. 

233 """ 

234 

235 self.connect() 

236 

237 if isinstance(query, Select): 

238 table_name = query.from_table.parts[-1].replace('`', '') 

239 

240 if table_name == 'files': 

241 table = self._files_table 

242 df = table.select(query) 

243 

244 # add content 

245 has_content = False 

246 for target in query.targets: 

247 if isinstance(target, Identifier) and target.parts[-1].lower() == 'content': 

248 has_content = True 

249 break 

250 if has_content: 

251 df['content'] = df['path'].apply(self._read_as_content) 

252 else: 

253 extension = table_name.split('.')[-1] 

254 if extension not in self.supported_file_formats: 

255 logger.error(f'The file format {extension} is not supported!') 

256 raise ValueError(f'The file format {extension} is not supported!') 

257 

258 table = FileTable(self, table_name=table_name) 

259 df = table.select(query) 

260 

261 response = Response( 

262 RESPONSE_TYPE.TABLE, 

263 data_frame=df 

264 ) 

265 elif isinstance(query, Insert): 

266 table_name = query.table.parts[-1] 

267 table = FileTable(self, table_name=table_name) 

268 table.insert(query) 

269 response = Response(RESPONSE_TYPE.OK) 

270 else: 

271 raise NotImplementedError 

272 

273 return response 

274 

275 def get_files(self) -> List[str]: 

276 client = self.connect() 

277 container_client = client.get_container_client(self.container_name) 

278 all_files = container_client.list_blobs() 

279 

280 # Wrap the object names with backticks to prevent SQL syntax errors. 

281 supported_files = [ 

282 f"`{file.get('name')}`" 

283 for file in all_files if file.get('name').split('.')[-1] in self.supported_file_formats 

284 ] 

285 

286 return supported_files 

287 

288 def get_tables(self) -> Response: 

289 """ 

290 Retrieves a list of tables (objects) in the Azure Containers. 

291 

292 Each object is considered a table. Only the supported file formats are considered as tables. 

293 

294 Returns: 

295 Response: A response object containing the list of tables and views, formatted as per the `Response` class. 

296 """ 

297 

298 # Get only the supported file formats. 

299 # Wrap the object names with backticks to prevent SQL syntax errors. 

300 

301 supported_files = self.get_files() 

302 

303 # virtual table with list of files 

304 supported_files.insert(0, 'files') 

305 

306 response = Response( 

307 RESPONSE_TYPE.TABLE, 

308 data_frame=pd.DataFrame( 

309 supported_files, 

310 columns=['table_name'] 

311 ) 

312 ) 

313 

314 return response 

315 

316 def get_columns(self, table_name: str) -> Response: 

317 """ 

318 Retrieves column details for a specified table (object) in the Azure Blob Container. 

319 

320 Args: 

321 table_name (Text): The name of the table for which to retrieve column information. 

322 

323 Raises: 

324 ValueError: If the 'table_name' is not a valid string. 

325 

326 Returns: 

327 Response: A response object containing the column details, formatted as per the `Response` class. 

328 """ 

329 if not table_name or not isinstance(table_name, str): 

330 raise ValueError("Invalid table name provided.") 

331 

332 query = f"SELECT * FROM {table_name} LIMIT 5" 

333 

334 result = self.query(query) 

335 

336 response = Response( 

337 RESPONSE_TYPE.TABLE, 

338 data_frame=pd.DataFrame( 

339 { 

340 'column_name': result.data_frame.columns, 

341 'data_type': [data_type if data_type != 'object' else 'string' for data_type in result.data_frame.dtypes] 

342 } 

343 ) 

344 ) 

345 

346 return response