Coverage for mindsdb / integrations / handlers / snowflake_handler / snowflake_handler.py: 80%

324 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-21 00:36 +0000

1import psutil 

2import pandas 

3from pandas import DataFrame 

4from pandas.api import types as pd_types 

5from snowflake.sqlalchemy import snowdialect 

6from snowflake import connector 

7from snowflake.connector.errors import NotSupportedError 

8from snowflake.connector.cursor import SnowflakeCursor, ResultMetadata 

9from typing import Any, Optional, List 

10 

11from mindsdb_sql_parser.ast.base import ASTNode 

12from mindsdb_sql_parser.ast import Select, Identifier 

13 

14from mindsdb.utilities import log 

15from mindsdb.integrations.libs.base import MetaDatabaseHandler 

16from mindsdb.utilities.render.sqlalchemy_render import SqlalchemyRender 

17from mindsdb.integrations.libs.response import ( 

18 HandlerStatusResponse as StatusResponse, 

19 HandlerResponse as Response, 

20 RESPONSE_TYPE, 

21) 

22from mindsdb.api.mysql.mysql_proxy.libs.constants.mysql import MYSQL_DATA_TYPE 

23 

24from .auth_types import ( 

25 PasswordAuthType, 

26 KeyPairAuthType, 

27) 

28 

29try: 

30 import pyarrow as pa 

31 

32 memory_pool = pa.default_memory_pool() 

33except Exception: 

34 memory_pool = None 

35 

36 

37logger = log.getLogger(__name__) 

38 

39 

40def _map_type(internal_type_name: str) -> MYSQL_DATA_TYPE: 

41 """Map Snowflake types to MySQL types. 

42 

43 Args: 

44 internal_type_name (str): The name of the Snowflake type to map. 

45 

46 Returns: 

47 MYSQL_DATA_TYPE: The MySQL type that corresponds to the Snowflake type. 

48 """ 

49 internal_type_name = internal_type_name.upper() 

50 types_map = { 

51 ("NUMBER", "DECIMAL", "DEC", "NUMERIC"): MYSQL_DATA_TYPE.DECIMAL, 

52 ("INT , INTEGER , BIGINT , SMALLINT , TINYINT , BYTEINT"): MYSQL_DATA_TYPE.INT, 

53 ("FLOAT", "FLOAT4", "FLOAT8"): MYSQL_DATA_TYPE.FLOAT, 

54 ("DOUBLE", "DOUBLE PRECISION", "REAL"): MYSQL_DATA_TYPE.DOUBLE, 

55 ("VARCHAR"): MYSQL_DATA_TYPE.VARCHAR, 

56 ("CHAR", "CHARACTER", "NCHAR"): MYSQL_DATA_TYPE.CHAR, 

57 ("STRING", "TEXT", "NVARCHAR"): MYSQL_DATA_TYPE.TEXT, 

58 ("NVARCHAR2", "CHAR VARYING", "NCHAR VARYING"): MYSQL_DATA_TYPE.VARCHAR, 

59 ("BINARY", "VARBINARY"): MYSQL_DATA_TYPE.BINARY, 

60 ("BOOLEAN",): MYSQL_DATA_TYPE.BOOL, 

61 ("TIMESTAMP_NTZ", "DATETIME"): MYSQL_DATA_TYPE.DATETIME, 

62 ("DATE",): MYSQL_DATA_TYPE.DATE, 

63 ("TIME",): MYSQL_DATA_TYPE.TIME, 

64 ("TIMESTAMP_LTZ"): MYSQL_DATA_TYPE.DATETIME, 

65 ("TIMESTAMP_TZ"): MYSQL_DATA_TYPE.DATETIME, 

66 ("VARIANT", "OBJECT", "ARRAY", "MAP", "GEOGRAPHY", "GEOMETRY", "VECTOR"): MYSQL_DATA_TYPE.VARCHAR, 

67 } 

68 

69 for db_types_list, mysql_data_type in types_map.items(): 

70 if internal_type_name in db_types_list: 

71 return mysql_data_type 

72 

73 logger.debug(f"Snowflake handler type mapping: unknown type: {internal_type_name}, use VARCHAR as fallback.") 

74 return MYSQL_DATA_TYPE.VARCHAR 

75 

76 

77def _make_table_response(result: DataFrame, cursor: SnowflakeCursor) -> Response: 

78 """Build response from result and cursor. 

79 NOTE: Snowflake return only 'general' type in description, so look on result's 

80 DF types and use types from description only if DF type is 'object' 

81 

82 Args: 

83 result (DataFrame): result of the query. 

84 cursor (SnowflakeCursor): cursor object. 

85 

86 Returns: 

87 Response: response object. 

88 """ 

89 description: list[ResultMetadata] = cursor.description 

90 mysql_types: list[MYSQL_DATA_TYPE] = [] 

91 for column in description: 

92 column_dtype = result[column.name].dtype 

93 description_column_type = connector.constants.FIELD_ID_TO_NAME.get(column.type_code) 

94 if description_column_type in ("OBJECT", "ARRAY"): 

95 mysql_types.append(MYSQL_DATA_TYPE.JSON) 

96 continue 

97 if description_column_type == "VECTOR": 

98 mysql_types.append(MYSQL_DATA_TYPE.VECTOR) 

99 continue 

100 if pd_types.is_integer_dtype(column_dtype): 

101 column_dtype_name = column_dtype.name 

102 if column_dtype_name in ("int8", "Int8"): 

103 mysql_types.append(MYSQL_DATA_TYPE.TINYINT) 

104 elif column_dtype in ("int16", "Int16"): 

105 mysql_types.append(MYSQL_DATA_TYPE.SMALLINT) 

106 elif column_dtype in ("int32", "Int32"): 

107 mysql_types.append(MYSQL_DATA_TYPE.MEDIUMINT) 

108 elif column_dtype in ("int64", "Int64"): 108 ↛ 111line 108 didn't jump to line 111 because the condition on line 108 was always true

109 mysql_types.append(MYSQL_DATA_TYPE.BIGINT) 

110 else: 

111 mysql_types.append(MYSQL_DATA_TYPE.INT) 

112 continue 

113 if pd_types.is_float_dtype(column_dtype): 

114 column_dtype_name = column_dtype.name 

115 if column_dtype_name in ("float16", "Float16"): # Float16 does not exists so far 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true

116 mysql_types.append(MYSQL_DATA_TYPE.FLOAT) 

117 elif column_dtype_name in ("float32", "Float32"): 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 mysql_types.append(MYSQL_DATA_TYPE.FLOAT) 

119 elif column_dtype_name in ("float64", "Float64"): 119 ↛ 122line 119 didn't jump to line 122 because the condition on line 119 was always true

120 mysql_types.append(MYSQL_DATA_TYPE.DOUBLE) 

121 else: 

122 mysql_types.append(MYSQL_DATA_TYPE.FLOAT) 

123 continue 

124 if pd_types.is_bool_dtype(column_dtype): 

125 mysql_types.append(MYSQL_DATA_TYPE.BOOLEAN) 

126 continue 

127 if pd_types.is_datetime64_any_dtype(column_dtype): 

128 mysql_types.append(MYSQL_DATA_TYPE.DATETIME) 

129 series = result[column.name] 

130 # snowflake use pytz.timezone 

131 if series.dt.tz is not None and getattr(series.dt.tz, "zone", "UTC") != "UTC": 

132 series = series.dt.tz_convert("UTC") 

133 result[column.name] = series.dt.tz_localize(None) 

134 continue 

135 

136 if pd_types.is_object_dtype(column_dtype): 136 ↛ 152line 136 didn't jump to line 152 because the condition on line 136 was always true

137 if description_column_type == "TEXT": 

138 # we can also check column.internal_size, if == 16777216 then it is TEXT, else VARCHAR(internal_size) 

139 mysql_types.append(MYSQL_DATA_TYPE.TEXT) 

140 continue 

141 elif description_column_type == "BINARY": 

142 # if column.internal_size == 8388608 then BINARY, else VARBINARY(internal_size) 

143 mysql_types.append(MYSQL_DATA_TYPE.BINARY) 

144 continue 

145 elif description_column_type == "DATE": 

146 mysql_types.append(MYSQL_DATA_TYPE.DATE) 

147 continue 

148 elif description_column_type == "TIME": 

149 mysql_types.append(MYSQL_DATA_TYPE.TIME) 

150 continue 

151 

152 if description_column_type == "FIXED": 152 ↛ 159line 152 didn't jump to line 159 because the condition on line 152 was always true

153 if column.scale == 0: 153 ↛ 157line 153 didn't jump to line 157 because the condition on line 153 was always true

154 mysql_types.append(MYSQL_DATA_TYPE.INT) 

155 else: 

156 # It is NUMBER, DECIMAL or NUMERIC with scale > 0 

157 mysql_types.append(MYSQL_DATA_TYPE.FLOAT) 

158 continue 

159 elif description_column_type == "REAL": 

160 mysql_types.append(MYSQL_DATA_TYPE.FLOAT) 

161 continue 

162 

163 mysql_types.append(MYSQL_DATA_TYPE.TEXT) 

164 

165 df = DataFrame( 

166 result, 

167 columns=[column.name for column in description], 

168 ) 

169 

170 return Response(RESPONSE_TYPE.TABLE, data_frame=df, affected_rows=None, mysql_types=mysql_types) 

171 

172 

173class SnowflakeHandler(MetaDatabaseHandler): 

174 """ 

175 This handler handles connection and execution of the Snowflake statements. 

176 """ 

177 

178 name = "snowflake" 

179 

180 _auth_types = { 

181 "key_pair": KeyPairAuthType(), 

182 "password": PasswordAuthType(), 

183 } 

184 

185 def __init__(self, name, **kwargs): 

186 super().__init__(name) 

187 self.connection_data = kwargs.get("connection_data") 

188 self.renderer = SqlalchemyRender(snowdialect.dialect) 

189 

190 self.is_connected = False 

191 self.connection = None 

192 

193 def connect(self): 

194 """ 

195 Establishes a connection to a Snowflake account. 

196 

197 Supports two authentication methods: 

198 1. User/password authentication (legacy) 

199 2. Key pair authentication (recommended) 

200 

201 Raises: 

202 ValueError: If the required connection parameters are not provided. 

203 snowflake.connector.errors.Error: If an error occurs while connecting to the Snowflake account. 

204 

205 Returns: 

206 snowflake.connector.connection.SnowflakeConnection: A connection object to the Snowflake account. 

207 """ 

208 

209 if self.is_connected is True: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 return self.connection 

211 

212 auth_type_key = self.connection_data.get("auth_type") 

213 if auth_type_key is None: 

214 supported = ", ".join(self._auth_types.keys()) 

215 raise ValueError(f"auth_type is required. Supported values: {supported}.") 

216 

217 auth_type = self._auth_types.get(auth_type_key) 

218 if not auth_type: 

219 supported = ", ".join(self._auth_types.keys()) 

220 raise ValueError(f"Invalid auth_type '{auth_type_key}'. Supported values: {supported}.") 

221 

222 config = auth_type.get_config(**self.connection_data) 

223 

224 try: 

225 self.connection = connector.connect(**config) 

226 self.connection.telemetry_enabled = False 

227 self.is_connected = True 

228 return self.connection 

229 except connector.errors.Error as e: 

230 logger.error(f"Error connecting to Snowflake, {e}!") 

231 raise 

232 

233 def disconnect(self): 

234 """ 

235 Closes the connection to the Snowflake account if it's currently open. 

236 """ 

237 

238 if self.is_connected is False: 

239 return 

240 self.connection.close() 

241 self.is_connected = False 

242 

243 def check_connection(self) -> StatusResponse: 

244 """ 

245 Checks the status of the connection to the Snowflake account. 

246 

247 Returns: 

248 StatusResponse: An object containing the success status and an error message if an error occurs. 

249 """ 

250 response = StatusResponse(False) 

251 need_to_close = not self.is_connected 

252 

253 try: 

254 connection = self.connect() 

255 

256 # Execute a simple query to test the connection 

257 with connection.cursor() as cur: 

258 cur.execute("select 1;") 

259 response.success = True 

260 except (connector.errors.Error, ValueError) as e: 

261 logger.error(f"Error connecting to Snowflake, {e}!") 

262 response.error_message = str(e) 

263 

264 if response.success and need_to_close: 

265 self.disconnect() 

266 

267 elif not response.success and self.is_connected: 

268 self.is_connected = False 

269 

270 return response 

271 

272 def native_query(self, query: str) -> Response: 

273 """ 

274 Executes a SQL query on the Snowflake account and returns the result. 

275 

276 Args: 

277 query (str): The SQL query to be executed. 

278 

279 Returns: 

280 Response: A response object containing the result of the query or an error message. 

281 """ 

282 

283 need_to_close = self.is_connected is False 

284 

285 connection = self.connect() 

286 with connection.cursor(connector.DictCursor) as cur: 

287 try: 

288 cur.execute(query) 

289 try: 

290 try: 

291 batches_iter = cur.fetch_pandas_batches() 

292 except ValueError: 

293 # duplicated columns raises ValueError 

294 raise NotSupportedError() 

295 

296 batches = [] 

297 memory_estimation_check_done = False 

298 

299 for batch_df in batches_iter: 

300 batches.append(batch_df) 

301 # region check the size of first batch (if it is big enough) to get an estimate of the full 

302 # dataset size. If i does not fit in memory - raise an error. 

303 # NOTE batch size cannot be set on client side. Also, Snowflake will download 

304 # 'CLIENT_PREFETCH_THREADS' count of chunks in parallel (by default 4), therefore this check 

305 # can not work in some cases. 

306 batches_rowcount = sum([len(x) for x in batches]) 

307 if memory_estimation_check_done is False and batches_rowcount > 1000: 

308 memory_estimation_check_done = True 

309 available_memory_kb = psutil.virtual_memory().available >> 10 

310 batches_size_kb = sum( 

311 [(x.memory_usage(index=True, deep=True).sum() >> 10) for x in batches] 

312 ) 

313 total_rowcount = cur.rowcount 

314 rest_rowcount = total_rowcount - batches_rowcount 

315 rest_estimated_size_kb = int((rest_rowcount / batches_rowcount) * batches_size_kb) 

316 if (available_memory_kb * 0.9) < rest_estimated_size_kb: 316 ↛ 299line 316 didn't jump to line 299 because the condition on line 316 was always true

317 logger.error( 

318 "Attempt to get too large dataset:\n" 

319 f"batches_rowcount={batches_rowcount}, size_kb={batches_size_kb}\n" 

320 f"total_rowcount={total_rowcount}, estimated_size_kb={rest_estimated_size_kb}\n" 

321 f"available_memory_kb={available_memory_kb}" 

322 ) 

323 raise MemoryError("Not enought memory") 

324 # endregion 

325 if len(batches) > 0: 325 ↛ 328line 325 didn't jump to line 328 because the condition on line 325 was always true

326 response = _make_table_response(result=pandas.concat(batches, ignore_index=True), cursor=cur) 

327 else: 

328 response = Response(RESPONSE_TYPE.TABLE, DataFrame([], columns=[x[0] for x in cur.description])) 

329 except NotSupportedError: 

330 # Fallback for CREATE/DELETE/UPDATE. These commands returns table with single column, 

331 # but it cannot be retrieved as pandas DataFrame. 

332 result = cur.fetchall() 

333 match result: 

334 case ( 

335 [{"number of rows inserted": affected_rows}] 

336 | [{"number of rows deleted": affected_rows}] 

337 | [{"number of rows updated": affected_rows, "number of multi-joined rows updated": _}] 

338 ): 

339 response = Response(RESPONSE_TYPE.OK, affected_rows=affected_rows) 

340 case list(): 

341 response = Response( 

342 RESPONSE_TYPE.TABLE, DataFrame(result, columns=[x[0] for x in cur.description]) 

343 ) 

344 case _: 

345 # Looks like SnowFlake always returns something in response, so this is suspicious 

346 logger.warning("Snowflake did not return any data in response.") 

347 response = Response(RESPONSE_TYPE.OK) 

348 except Exception as e: 

349 logger.error(f"Error running query: {query} on {self.connection_data.get('database')}, {e}!") 

350 response = Response(RESPONSE_TYPE.ERROR, error_code=0, error_message=str(e)) 

351 

352 if need_to_close is True: 352 ↛ 355line 352 didn't jump to line 355 because the condition on line 352 was always true

353 self.disconnect() 

354 

355 if memory_pool is not None and memory_pool.backend_name == "jemalloc": 

356 # This reduce memory consumption, but will slow down next query slightly. 

357 # Except pool type 'jemalloc': memory consumption do not change significantly 

358 # and next query processing time may be even lower. 

359 memory_pool.release_unused() 

360 

361 return response 

362 

363 def query(self, query: ASTNode) -> Response: 

364 """ 

365 Executes a SQL query represented by an ASTNode and retrieves the data. 

366 

367 Args: 

368 query (ASTNode): An ASTNode representing the SQL query to be executed. 

369 

370 Returns: 

371 Response: The response from the `native_query` method, containing the result of the SQL query execution. 

372 """ 

373 

374 query_str = self.renderer.get_string(query, with_failback=True) 

375 logger.debug(f"Executing SQL query: {query_str}") 

376 result = self.native_query(query_str) 

377 return self.lowercase_columns(result, query) 

378 

379 def lowercase_columns(self, result, query): 

380 if not isinstance(query, Select) or result.data_frame is None: 380 ↛ 383line 380 didn't jump to line 383 because the condition on line 380 was always true

381 return result 

382 

383 quoted_columns = [] 

384 if query.targets is not None: 

385 for column in query.targets: 

386 if hasattr(column, "alias") and column.alias is not None: 

387 if column.alias.is_quoted[-1]: 

388 quoted_columns.append(column.alias.parts[-1]) 

389 elif isinstance(column, Identifier): 

390 if column.is_quoted[-1]: 

391 quoted_columns.append(column.parts[-1]) 

392 

393 rename_columns = {} 

394 for col in result.data_frame.columns: 

395 if col.isupper() and col not in quoted_columns: 

396 rename_columns[col] = col.lower() 

397 if rename_columns: 

398 result.data_frame = result.data_frame.rename(columns=rename_columns) 

399 return result 

400 

401 def get_tables(self) -> Response: 

402 """ 

403 Retrieves a list of all non-system tables and views in the current schema of the Snowflake account. 

404 

405 Returns: 

406 Response: A response object containing the list of tables and views, formatted as per the `Response` class. 

407 """ 

408 

409 query = """ 

410 SELECT TABLE_NAME, TABLE_SCHEMA, TABLE_TYPE 

411 FROM INFORMATION_SCHEMA.TABLES 

412 WHERE TABLE_TYPE IN ('BASE TABLE', 'VIEW') 

413 AND TABLE_SCHEMA = current_schema() 

414 """ 

415 return self.native_query(query) 

416 

417 def get_columns(self, table_name) -> Response: 

418 """ 

419 Retrieves column details for a specified table in the Snowflake account. 

420 

421 Args: 

422 table_name (str): The name of the table for which to retrieve column information. 

423 

424 Returns: 

425 Response: A response object containing the column details, formatted as per the `Response` class. 

426 

427 Raises: 

428 ValueError: If the 'table_name' is not a valid string. 

429 """ 

430 

431 if not table_name or not isinstance(table_name, str): 431 ↛ 432line 431 didn't jump to line 432 because the condition on line 431 was never true

432 raise ValueError("Invalid table name provided.") 

433 

434 query = f""" 

435 SELECT 

436 COLUMN_NAME, 

437 DATA_TYPE, 

438 ORDINAL_POSITION, 

439 COLUMN_DEFAULT, 

440 IS_NULLABLE, 

441 CHARACTER_MAXIMUM_LENGTH, 

442 CHARACTER_OCTET_LENGTH, 

443 NUMERIC_PRECISION, 

444 NUMERIC_SCALE, 

445 DATETIME_PRECISION, 

446 CHARACTER_SET_NAME, 

447 COLLATION_NAME 

448 FROM INFORMATION_SCHEMA.COLUMNS 

449 WHERE TABLE_NAME = '{table_name}' 

450 AND TABLE_SCHEMA = current_schema() 

451 """ 

452 result = self.native_query(query) 

453 result.to_columns_table_response(map_type_fn=_map_type) 

454 

455 return result 

456 

457 def meta_get_tables(self, table_names: Optional[List[str]] = None) -> Response: 

458 """ 

459 Retrieves metadata information about the tables in the Snowflake database to be stored in the data catalog. 

460 

461 Args: 

462 table_names (list): A list of table names for which to retrieve metadata information. 

463 

464 Returns: 

465 Response: A response object containing the metadata information, formatted as per the `Response` class. 

466 """ 

467 query = """ 

468 SELECT 

469 TABLE_CATALOG, 

470 TABLE_SCHEMA, 

471 TABLE_NAME, 

472 TABLE_TYPE, 

473 COMMENT AS TABLE_DESCRIPTION, 

474 ROW_COUNT, 

475 CREATED, 

476 LAST_ALTERED 

477 FROM INFORMATION_SCHEMA.TABLES 

478 WHERE TABLE_SCHEMA = current_schema() 

479 AND TABLE_TYPE IN ('BASE TABLE', 'VIEW') 

480 """ 

481 

482 if table_names is not None and len(table_names) > 0: 482 ↛ 486line 482 didn't jump to line 486 because the condition on line 482 was always true

483 table_names_str = ", ".join([f"'{t.upper()}'" for t in table_names]) 

484 query += f" AND TABLE_NAME IN ({table_names_str})" 

485 

486 result = self.native_query(query) 

487 result.data_frame["ROW_COUNT"] = result.data_frame["ROW_COUNT"].astype(int) 

488 

489 return result 

490 

491 def meta_get_columns(self, table_names: Optional[List[str]] = None) -> Response: 

492 """ 

493 Retrieves column metadata for the specified tables (or all tables if no list is provided). 

494 

495 Args: 

496 table_names (list): A list of table names for which to retrieve column metadata. 

497 

498 Returns: 

499 Response: A response object containing the column metadata. 

500 """ 

501 query = """ 

502 SELECT 

503 TABLE_NAME, 

504 COLUMN_NAME, 

505 DATA_TYPE, 

506 COMMENT AS COLUMN_DESCRIPTION, 

507 COLUMN_DEFAULT, 

508 (IS_NULLABLE = 'YES') AS IS_NULLABLE, 

509 CHARACTER_MAXIMUM_LENGTH, 

510 CHARACTER_OCTET_LENGTH, 

511 NUMERIC_PRECISION, 

512 NUMERIC_SCALE, 

513 DATETIME_PRECISION, 

514 CHARACTER_SET_NAME, 

515 COLLATION_NAME 

516 FROM INFORMATION_SCHEMA.COLUMNS 

517 WHERE TABLE_SCHEMA = current_schema() 

518 """ 

519 

520 if table_names is not None and len(table_names) > 0: 520 ↛ 524line 520 didn't jump to line 524 because the condition on line 520 was always true

521 table_names_str = ", ".join([f"'{t.upper()}'" for t in table_names]) 

522 query += f" AND TABLE_NAME IN ({table_names_str})" 

523 

524 result = self.native_query(query) 

525 return result 

526 

527 def meta_get_column_statistics(self, table_names: Optional[List[str]] = None) -> Response: 

528 """ 

529 Retrieves basic column statistics: null %, distinct count. 

530 Due to Snowflake limitations, this runs per-table not per-column. 

531 TODO: Add most_common_values and most_common_frequencies 

532 """ 

533 columns_query = """ 

534 SELECT TABLE_NAME, COLUMN_NAME 

535 FROM INFORMATION_SCHEMA.COLUMNS 

536 WHERE TABLE_SCHEMA = current_schema() 

537 """ 

538 if table_names: 538 ↛ 542line 538 didn't jump to line 542 because the condition on line 538 was always true

539 table_names_str = ", ".join([f"'{t.upper()}'" for t in table_names]) 

540 columns_query += f" AND TABLE_NAME IN ({table_names_str})" 

541 

542 columns_result = self.native_query(columns_query) 

543 if ( 

544 columns_result.type == RESPONSE_TYPE.ERROR 

545 or columns_result.data_frame is None 

546 or columns_result.data_frame.empty 

547 ): 

548 return Response(RESPONSE_TYPE.ERROR, error_message="No columns found.") 

549 

550 columns_df = columns_result.data_frame 

551 grouped = columns_df.groupby("TABLE_NAME") 

552 all_stats = [] 

553 

554 for table_name, group in grouped: 

555 select_parts = [] 

556 for _, row in group.iterrows(): 

557 col = row["COLUMN_NAME"] 

558 # Ensure column names in the query are properly quoted if they contain special characters or are case-sensitive 

559 quoted_col = f'"{col}"' 

560 select_parts.extend( 

561 [ 

562 f'COUNT_IF({quoted_col} IS NULL) AS "nulls_{col}"', 

563 f'APPROX_COUNT_DISTINCT({quoted_col}) AS "distincts_{col}"', 

564 f'MIN({quoted_col}) AS "min_{col}"', 

565 f'MAX({quoted_col}) AS "max_{col}"', 

566 ] 

567 ) 

568 

569 quoted_table_name = f'"{table_name}"' 

570 stats_query = f""" 

571 SELECT COUNT(*) AS "total_rows", {", ".join(select_parts)} 

572 FROM {quoted_table_name} 

573 """ 

574 try: 

575 stats_res = self.native_query(stats_query) 

576 if stats_res.type != RESPONSE_TYPE.TABLE or stats_res.data_frame is None or stats_res.data_frame.empty: 576 ↛ 577line 576 didn't jump to line 577 because the condition on line 576 was never true

577 logger.warning( 

578 f"Could not retrieve stats for table {table_name}. Query returned no data or an error: {stats_res.error_message if stats_res.type == RESPONSE_TYPE.ERROR else 'No data'}" 

579 ) 

580 # Add placeholder stats if query fails or returns empty 

581 for _, row in group.iterrows(): 

582 all_stats.append( 

583 { 

584 "table_name": table_name, 

585 "column_name": row["COLUMN_NAME"], 

586 "null_percentage": None, 

587 "distinct_values_count": None, 

588 "most_common_values": [], 

589 "most_common_frequencies": [], 

590 "minimum_value": None, 

591 "maximum_value": None, 

592 } 

593 ) 

594 continue 

595 

596 stats_data = stats_res.data_frame.iloc[0] 

597 total_rows = stats_data.get("total_rows", 0) 

598 

599 for _, row in group.iterrows(): 

600 col = row["COLUMN_NAME"] 

601 # Keys for stats_data should match the aliases in stats_query (e.g., "nulls_COLNAME") 

602 nulls = stats_data.get(f"nulls_{col}", 0) 

603 distincts = stats_data.get(f"distincts_{col}", None) 

604 min_val = stats_data.get(f"min_{col}", None) 

605 max_val = stats_data.get(f"max_{col}", None) 

606 null_pct = (nulls / total_rows) * 100 if total_rows is not None and total_rows > 0 else None 

607 

608 all_stats.append( 

609 { 

610 "table_name": table_name, 

611 "column_name": col, 

612 "null_percentage": null_pct, 

613 "distinct_values_count": distincts, 

614 "most_common_values": [], 

615 "most_common_frequencies": [], 

616 "minimum_value": min_val, 

617 "maximum_value": max_val, 

618 } 

619 ) 

620 except Exception as e: 

621 logger.error(f"Exception while fetching statistics for table {table_name}: {e}") 

622 for _, row in group.iterrows(): 

623 all_stats.append( 

624 { 

625 "table_name": table_name, 

626 "column_name": row["COLUMN_NAME"], 

627 "null_percentage": None, 

628 "distinct_values_count": None, 

629 "most_common_values": [], 

630 "most_common_frequencies": [], 

631 "minimum_value": None, 

632 "maximum_value": None, 

633 } 

634 ) 

635 

636 if not all_stats: 636 ↛ 637line 636 didn't jump to line 637 because the condition on line 636 was never true

637 return Response(RESPONSE_TYPE.TABLE, data_frame=pandas.DataFrame()) 

638 

639 return Response(RESPONSE_TYPE.TABLE, data_frame=pandas.DataFrame(all_stats)) 

640 

641 def meta_get_primary_keys(self, table_names: Optional[List[str]] = None) -> Response: 

642 """ 

643 Retrieves primary key information for the specified tables (or all tables if no list is provided). 

644 

645 Args: 

646 table_names (list): A list of table names for which to retrieve primary key information. 

647 

648 Returns: 

649 Response: A response object containing the primary key information. 

650 """ 

651 try: 

652 query = """ 

653 SHOW PRIMARY KEYS IN TABLE; 

654 """ 

655 

656 response = self.native_query(query) 

657 if response.type == RESPONSE_TYPE.ERROR and response.error_message: 657 ↛ 658line 657 didn't jump to line 658 because the condition on line 657 was never true

658 logger.error(f"Query error in meta_get_primary_keys: {response.error_message}\nQuery:\n{query}") 

659 

660 df = response.data_frame 

661 if not df.empty: 661 ↛ 668line 661 didn't jump to line 668 because the condition on line 661 was always true

662 if table_names: 662 ↛ 665line 662 didn't jump to line 665 because the condition on line 662 was always true

663 df = df[df["table_name"].isin(table_names)] 

664 

665 df = df[["table_name", "column_name", "key_sequence", "constraint_name"]] 

666 df = df.rename(columns={"key_sequence": "ordinal_position"}) 

667 

668 response.data_frame = df 

669 

670 return response 

671 

672 except Exception as e: 

673 logger.error(f"Exception in meta_get_primary_keys: {e!r}") 

674 return Response(RESPONSE_TYPE.ERROR, error_message=f"Exception querying primary keys: {e!r}") 

675 

676 def meta_get_foreign_keys(self, table_names: Optional[List[str]] = None) -> Response: 

677 """ 

678 Retrieves foreign key information for the specified tables (or all tables if no list is provided). 

679 

680 Args: 

681 table_names (list): A list of table names for which to retrieve foreign key information. 

682 

683 Returns: 

684 Response: A response object containing the foreign key information. 

685 """ 

686 try: 

687 query = """ 

688 SHOW IMPORTED KEYS IN TABLE; 

689 """ 

690 

691 response = self.native_query(query) 

692 if response.type == RESPONSE_TYPE.ERROR and response.error_message: 692 ↛ 693line 692 didn't jump to line 693 because the condition on line 692 was never true

693 logger.error(f"Query error in meta_get_primary_keys: {response.error_message}\nQuery:\n{query}") 

694 

695 df = response.data_frame 

696 if not df.empty: 696 ↛ 710line 696 didn't jump to line 710 because the condition on line 696 was always true

697 if table_names: 697 ↛ 700line 697 didn't jump to line 700 because the condition on line 697 was always true

698 df = df[df["pk_table_name"].isin(table_names) & df["fk_table_name"].isin(table_names)] 

699 

700 df = df[["pk_table_name", "pk_column_name", "fk_table_name", "fk_column_name"]] 

701 df = df.rename( 

702 columns={ 

703 "pk_table_name": "child_table_name", 

704 "pk_column_name": "child_column_name", 

705 "fk_table_name": "parent_table_name", 

706 "fk_column_name": "parent_column_name", 

707 } 

708 ) 

709 

710 response.data_frame = df 

711 

712 return response 

713 

714 except Exception as e: 

715 logger.error(f"Exception in meta_get_primary_keys: {e!r}") 

716 return Response(RESPONSE_TYPE.ERROR, error_message=f"Exception querying primary keys: {e!r}") 

717 

718 def meta_get_handler_info(self, **kwargs: Any) -> str: 

719 """ 

720 Retrieves information about the design and implementation of the database handler. 

721 This should include, but not be limited to, the following: 

722 - The type of SQL queries and operations that the handler supports. 

723 - etc. 

724 

725 Args: 

726 kwargs: Additional keyword arguments that may be used in generating the handler information. 

727 

728 Returns: 

729 str: A string containing information about the database handler's design and implementation. 

730 """ 

731 return ( 

732 "To query columns that contain special characters, use ticks around the column name, e.g. `column name`.\n" 

733 "DO NOT use double quotes for this purpose." 

734 )